rmatharu commented on a change in pull request #1353:
URL: https://github.com/apache/samza/pull/1353#discussion_r417563382



##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
##########
@@ -363,15 +364,22 @@ File getStoreLocation(String storeName) {
 
     // Step 3
     metadata.forEach((systemStream, systemStreamMetadata) -> {
+
         // get the partition metadata for each system stream
         Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
partitionMetadata =
           systemStreamMetadata.getSystemStreamPartitionMetadata();
 
         // For SSPs belonging to the system stream, use the partition metadata 
to get the oldest offset
-        Map<SystemStreamPartition, String> offsets = 
systemStreamToSsp.get(systemStream).stream()
-          .collect(
-              Collectors.toMap(Function.identity(), ssp -> 
partitionMetadata.get(ssp.getPartition()).getOldestOffset()));
-
+        // if partitionMetadata was not obtained for any SSP, populate 
oldest-offset as null
+        // Because of https://bugs.openjdk.java.net/browse/JDK-8148463 using 
lambda will NPE when getOldestOffset() is null
+        Map<SystemStreamPartition, String> offsets = new HashMap<>();
+        for (SystemStreamPartition ssp : systemStreamToSsp.get(systemStream)) {
+          if (partitionMetadata == null || 
partitionMetadata.get(ssp.getPartition()) == null) {
+            offsets.put(ssp, null);
+          } else {
+            offsets.put(ssp, 
partitionMetadata.get(ssp.getPartition()).getOldestOffset());

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to