bkonold commented on a change in pull request #1367:
URL: https://github.com/apache/samza/pull/1367#discussion_r429525357



##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input 
SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new 
StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new 
ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, 
storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new 
TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();
+  }
+
+  public TaskName getTaskName() {
+    return this.taskName;
+  }
+
+  public void init() {
+    this.taskSideInputStorageManager.init();
+
+    Map<SystemStreamPartition, String> fileOffsets = 
taskSideInputStorageManager.getFileOffsets();
+    this.lastProcessedOffsets.putAll(fileOffsets);
+    this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+  }
+
+  public void process(IncomingMessageEnvelope envelope) {
+    SystemStreamPartition envelopeSSP = envelope.getSystemStreamPartition();
+    String envelopeOffset = envelope.getOffset();
+
+    for (String store: this.sspToStores.get(envelopeSSP)) {
+      SideInputsProcessor storeProcessor = this.storeToProcessor.get(store);
+      KeyValueStore keyValueStore = (KeyValueStore) 
this.taskSideInputStorageManager.getStore(store);
+      Collection<Entry<?, ?>> entriesToBeWritten = 
storeProcessor.process(envelope, keyValueStore);
+
+      // TODO: SAMZA-2255: optimize writes to side input stores
+      for (Entry entry : entriesToBeWritten) {
+        // If the key is null we ignore, if the value is null, we issue a 
delete, else we issue a put
+        if (entry.getKey() != null) {
+          if (entry.getValue() != null) {
+            keyValueStore.put(entry.getKey(), entry.getValue());
+          } else {
+            keyValueStore.delete(entry.getKey());
+          }
+        }
+      }
+    }
+
+    this.lastProcessedOffsets.put(envelopeSSP, envelopeOffset);
+  }

Review comment:
       copied from `TaskSideInputStorageManager`

##########
File path: 
samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputHandler.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+
+public class TestTaskSideInputHandler {
+  private static final String TEST_TASK_NAME = "test-task";
+  private static final String TEST_SYSTEM = "test-system";
+  private static final String TEST_STORE = "test-store";
+  private static final String TEST_STREAM = "test-stream";
+
+    /**
+   * This test is for cases, when calls to systemAdmin (e.g., 
KafkaSystemAdmin's) get-stream-metadata method return null.
+   */
+  @Test
+  public void testGetStartingOffsetsWhenStreamMetadataIsNull() {
+    final String taskName = "test-get-starting-offset-task";
+
+    Set<SystemStreamPartition> ssps = IntStream.range(1, 2)
+        .mapToObj(idx -> new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, 
new Partition(idx)))
+        .collect(Collectors.toSet());
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
partitionMetadata = ssps.stream()
+        .collect(Collectors.toMap(SystemStreamPartition::getPartition,
+            x -> new SystemStreamMetadata.SystemStreamPartitionMetadata(null, 
"1", "2")));
+
+
+    TaskSideInputHandler handler = new 
MockTaskSideInputHandlerBuilder(taskName, TaskMode.Active)
+        .addStreamMetadata(Collections.singletonMap(new 
SystemStream(TEST_SYSTEM, TEST_STREAM),
+            new SystemStreamMetadata(TEST_STREAM, partitionMetadata)))
+        .addStore(TEST_STORE, ssps)
+        .build();
+
+    handler.init();
+
+    ssps.forEach(ssp -> {
+        String startingOffset = handler.getStartingOffset(
+            new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, 
ssp.getPartition()));
+        Assert.assertNull("Starting offset should be null", startingOffset);
+      });
+  }
+
+  @Test
+  public void testGetStartingOffsets() {
+    final String storeName = "test-get-starting-offset-store";
+    final String taskName = "test-get-starting-offset-task";
+
+    Set<SystemStreamPartition> ssps = IntStream.range(1, 6)
+        .mapToObj(idx -> new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, 
new Partition(idx)))
+        .collect(Collectors.toSet());
+
+
+    TaskSideInputHandler handler = new 
MockTaskSideInputHandlerBuilder(taskName, TaskMode.Active)
+        .addStore(storeName, ssps)
+        .build();
+
+    // set up file and oldest offsets. for even partitions, fileOffsets will 
be larger; for odd partitions oldestOffsets will be larger
+    Map<SystemStreamPartition, String> fileOffsets = ssps.stream()
+        .collect(Collectors.toMap(Function.identity(), ssp -> {
+            int partitionId = ssp.getPartition().getPartitionId();
+            int offset = partitionId % 2 == 0 ? partitionId + 10 : partitionId;
+            return String.valueOf(offset);
+          }));
+    Map<SystemStreamPartition, String> oldestOffsets = ssps.stream()
+        .collect(Collectors.toMap(Function.identity(), ssp -> {
+            int partitionId = ssp.getPartition().getPartitionId();
+            int offset = partitionId % 2 == 0 ? partitionId : partitionId + 10;
+
+            return String.valueOf(offset);
+          }));
+
+    doCallRealMethod().when(handler).getStartingOffsets(fileOffsets, 
oldestOffsets);
+
+    Map<SystemStreamPartition, String> startingOffsets = 
handler.getStartingOffsets(fileOffsets, oldestOffsets);
+
+    assertTrue("Failed to get starting offsets for all ssps", 
startingOffsets.size() == 5);
+    startingOffsets.forEach((ssp, offset) -> {
+        int partitionId = ssp.getPartition().getPartitionId();
+        String expectedOffset = partitionId % 2 == 0
+            // 1 + fileOffset
+            ? 
getOffsetAfter(String.valueOf(ssp.getPartition().getPartitionId() + 10))
+            // oldestOffset
+            : String.valueOf(ssp.getPartition().getPartitionId() + 10);
+        assertEquals("Larger of fileOffsets and oldestOffsets should always be 
chosen", expectedOffset, offset);
+      });

Review comment:
       New - added for stricter behavior check on `getStartingOffsets`

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input 
SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new 
StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new 
ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, 
storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new 
TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();
+  }
+
+  public TaskName getTaskName() {
+    return this.taskName;
+  }
+
+  public void init() {
+    this.taskSideInputStorageManager.init();
+
+    Map<SystemStreamPartition, String> fileOffsets = 
taskSideInputStorageManager.getFileOffsets();
+    this.lastProcessedOffsets.putAll(fileOffsets);
+    this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+  }
+
+  public void process(IncomingMessageEnvelope envelope) {
+    SystemStreamPartition envelopeSSP = envelope.getSystemStreamPartition();
+    String envelopeOffset = envelope.getOffset();
+
+    for (String store: this.sspToStores.get(envelopeSSP)) {
+      SideInputsProcessor storeProcessor = this.storeToProcessor.get(store);
+      KeyValueStore keyValueStore = (KeyValueStore) 
this.taskSideInputStorageManager.getStore(store);
+      Collection<Entry<?, ?>> entriesToBeWritten = 
storeProcessor.process(envelope, keyValueStore);
+
+      // TODO: SAMZA-2255: optimize writes to side input stores
+      for (Entry entry : entriesToBeWritten) {
+        // If the key is null we ignore, if the value is null, we issue a 
delete, else we issue a put
+        if (entry.getKey() != null) {
+          if (entry.getValue() != null) {
+            keyValueStore.put(entry.getKey(), entry.getValue());
+          } else {
+            keyValueStore.delete(entry.getKey());
+          }
+        }
+      }
+    }
+
+    this.lastProcessedOffsets.put(envelopeSSP, envelopeOffset);
+  }
+
+  public void flush() {
+    this.taskSideInputStorageManager.flush(this.lastProcessedOffsets);
+  }
+
+  public String getStartingOffset(SystemStreamPartition ssp) {
+    return this.startingOffsets.get(ssp);
+  }
+
+  public String getLastProcessedOffset(SystemStreamPartition ssp) {
+    return this.lastProcessedOffsets.get(ssp);
+  }
+
+  public void stop() {
+    this.taskSideInputStorageManager.stop(this.lastProcessedOffsets);
+  }
+
+  /**
+   * Gets the starting offsets for the {@link SystemStreamPartition}s 
belonging to all the side input stores.
+   * If the local file offset is available and is greater than the oldest 
available offset from source, uses it,
+   * else falls back to oldest offset in the source.
+   *
+   * @param fileOffsets offsets from the local offset file
+   * @param oldestOffsets oldest offsets from the source
+   * @return a {@link Map} of {@link SystemStreamPartition} to offset
+   */
+  @VisibleForTesting
+  Map<SystemStreamPartition, String> getStartingOffsets(
+      Map<SystemStreamPartition, String> fileOffsets, 
Map<SystemStreamPartition, String> oldestOffsets) {
+    Map<SystemStreamPartition, String> startingOffsets = new HashMap<>();
+
+    this.sspToStores.keySet().forEach(ssp -> {
+        String fileOffset = fileOffsets.get(ssp);
+        String oldestOffset = oldestOffsets.get(ssp);
+
+        startingOffsets.put(ssp,
+            this.storageManagerUtil.getStartingOffset(
+                ssp, this.systemAdmins.getSystemAdmin(ssp.getSystem()), 
fileOffset, oldestOffset));
+      });
+
+    return startingOffsets;
+  }
+
+  /**
+   * Gets the oldest offset for the {@link SystemStreamPartition}s associated 
with all the store side inputs.
+   *   1. Groups the list of the SSPs based on system stream
+   *   2. Fetches the {@link SystemStreamMetadata} from {@link 
StreamMetadataCache}
+   *   3. Fetches the partition metadata for each system stream and fetch the 
corresponding partition metadata
+   *      and populates the oldest offset for SSPs belonging to the system 
stream.
+   *
+   * @return a {@link Map} of {@link SystemStreamPartition} to their oldest 
offset. If partitionMetadata could not be
+   * obtained for any {@link SystemStreamPartition} the offset for it is 
populated as null.
+   */
+  @VisibleForTesting
+  Map<SystemStreamPartition, String> getOldestOffsets() {
+    Map<SystemStreamPartition, String> oldestOffsets = new HashMap<>();
+
+    // Step 1
+    Map<SystemStream, List<SystemStreamPartition>> systemStreamToSsp = 
this.sspToStores.keySet().stream()
+        
.collect(Collectors.groupingBy(SystemStreamPartition::getSystemStream));
+
+    // Step 2
+    Map<SystemStream, SystemStreamMetadata> metadata = 
JavaConverters.mapAsJavaMapConverter(
+        streamMetadataCache.getStreamMetadata(
+            
JavaConverters.asScalaSetConverter(systemStreamToSsp.keySet()).asScala().toSet(),
 false)).asJava();
+
+    // Step 3
+    metadata.forEach((systemStream, systemStreamMetadata) -> {
+
+        // get the partition metadata for each system stream
+        Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
partitionMetadata =
+            systemStreamMetadata.getSystemStreamPartitionMetadata();
+
+        // For SSPs belonging to the system stream, use the partition metadata 
to get the oldest offset
+        // if partitionMetadata was not obtained for any SSP, populate 
oldest-offset as null
+        // Because of https://bugs.openjdk.java.net/browse/JDK-8148463 using 
lambda will NPE when getOldestOffset() is null
+        for (SystemStreamPartition ssp : systemStreamToSsp.get(systemStream)) {
+          oldestOffsets.put(ssp, 
partitionMetadata.get(ssp.getPartition()).getOldestOffset());
+        }
+      });
+
+    return oldestOffsets;
+  }
+
+  private void validateProcessorConfiguration() {
+    Set<String> stores = this.sspToStores.values().stream()
+        .flatMap(Collection::stream)
+        .collect(Collectors.toSet());
+
+    stores.forEach(storeName -> {
+        if (!storeToProcessor.containsKey(storeName)) {
+          throw new SamzaException(
+              String.format("Side inputs processor missing for store: %s.", 
storeName));
+        }
+      });
+  }
+}

Review comment:
       copied from `TaskSideInputStorageManager`

##########
File path: 
samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputHandler.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+
+public class TestTaskSideInputHandler {
+  private static final String TEST_TASK_NAME = "test-task";
+  private static final String TEST_SYSTEM = "test-system";
+  private static final String TEST_STORE = "test-store";
+  private static final String TEST_STREAM = "test-stream";
+
+    /**
+   * This test is for cases, when calls to systemAdmin (e.g., 
KafkaSystemAdmin's) get-stream-metadata method return null.
+   */
+  @Test
+  public void testGetStartingOffsetsWhenStreamMetadataIsNull() {
+    final String taskName = "test-get-starting-offset-task";
+
+    Set<SystemStreamPartition> ssps = IntStream.range(1, 2)
+        .mapToObj(idx -> new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, 
new Partition(idx)))
+        .collect(Collectors.toSet());
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
partitionMetadata = ssps.stream()
+        .collect(Collectors.toMap(SystemStreamPartition::getPartition,
+            x -> new SystemStreamMetadata.SystemStreamPartitionMetadata(null, 
"1", "2")));
+
+
+    TaskSideInputHandler handler = new 
MockTaskSideInputHandlerBuilder(taskName, TaskMode.Active)
+        .addStreamMetadata(Collections.singletonMap(new 
SystemStream(TEST_SYSTEM, TEST_STREAM),
+            new SystemStreamMetadata(TEST_STREAM, partitionMetadata)))
+        .addStore(TEST_STORE, ssps)
+        .build();
+
+    handler.init();
+
+    ssps.forEach(ssp -> {
+        String startingOffset = handler.getStartingOffset(
+            new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, 
ssp.getPartition()));
+        Assert.assertNull("Starting offset should be null", startingOffset);
+      });
+  }
+
+  @Test
+  public void testGetStartingOffsets() {
+    final String storeName = "test-get-starting-offset-store";
+    final String taskName = "test-get-starting-offset-task";
+
+    Set<SystemStreamPartition> ssps = IntStream.range(1, 6)
+        .mapToObj(idx -> new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, 
new Partition(idx)))
+        .collect(Collectors.toSet());
+
+
+    TaskSideInputHandler handler = new 
MockTaskSideInputHandlerBuilder(taskName, TaskMode.Active)
+        .addStore(storeName, ssps)
+        .build();
+
+    // set up file and oldest offsets. for even partitions, fileOffsets will 
be larger; for odd partitions oldestOffsets will be larger
+    Map<SystemStreamPartition, String> fileOffsets = ssps.stream()
+        .collect(Collectors.toMap(Function.identity(), ssp -> {
+            int partitionId = ssp.getPartition().getPartitionId();
+            int offset = partitionId % 2 == 0 ? partitionId + 10 : partitionId;
+            return String.valueOf(offset);
+          }));
+    Map<SystemStreamPartition, String> oldestOffsets = ssps.stream()
+        .collect(Collectors.toMap(Function.identity(), ssp -> {
+            int partitionId = ssp.getPartition().getPartitionId();
+            int offset = partitionId % 2 == 0 ? partitionId : partitionId + 10;
+
+            return String.valueOf(offset);
+          }));
+
+    doCallRealMethod().when(handler).getStartingOffsets(fileOffsets, 
oldestOffsets);
+
+    Map<SystemStreamPartition, String> startingOffsets = 
handler.getStartingOffsets(fileOffsets, oldestOffsets);
+
+    assertTrue("Failed to get starting offsets for all ssps", 
startingOffsets.size() == 5);

Review comment:
       adapted from `TaskSideInputStorageManager`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to