rmatharu commented on a change in pull request #1353:
URL: https://github.com/apache/samza/pull/1353#discussion_r417563382
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
##########
@@ -363,15 +364,22 @@ File getStoreLocation(String storeName) {
// Step 3
metadata.forEach((systemStream, systemStreamMetadata) -> {
+
// get the partition metadata for each system stream
Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata>
partitionMetadata =
systemStreamMetadata.getSystemStreamPartitionMetadata();
// For SSPs belonging to the system stream, use the partition metadata
to get the oldest offset
- Map<SystemStreamPartition, String> offsets =
systemStreamToSsp.get(systemStream).stream()
- .collect(
- Collectors.toMap(Function.identity(), ssp ->
partitionMetadata.get(ssp.getPartition()).getOldestOffset()));
-
+ // if partitionMetadata was not obtained for any SSP, populate
oldest-offset as null
+ // Because of https://bugs.openjdk.java.net/browse/JDK-8148463 using
lambda will NPE when getOldestOffset() is null
+ Map<SystemStreamPartition, String> offsets = new HashMap<>();
+ for (SystemStreamPartition ssp : systemStreamToSsp.get(systemStream)) {
+ if (partitionMetadata == null ||
partitionMetadata.get(ssp.getPartition()) == null) {
+ offsets.put(ssp, null);
+ } else {
+ offsets.put(ssp,
partitionMetadata.get(ssp.getPartition()).getOldestOffset());
Review comment:
done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]