Repository: samza
Updated Branches:
  refs/heads/master 666835186 -> 2b5970d81


SAMZA-1870: hdfs offset comparator to handle end of stream offset

This happens particularly when using HDFS as a bootstrap stream:

org.apache.samza.SamzaException: Invalid offset for MultiFileHdfsReader: 
END_OF_STREAM
at 
org.apache.samza.system.hdfs.reader.MultiFileHdfsReader.getCurFileIndex(MultiFileHdfsReader.java:64)
at 
org.apache.samza.system.hdfs.HdfsSystemAdmin.offsetComparator(HdfsSystemAdmin.java:224)
at 
org.apache.samza.system.chooser.BootstrappingChooser.org$apache$samza$system$chooser$BootstrappingChooser$$checkOffset(BootstrappingChooser.scala:274)
at 
org.apache.samza.system.chooser.BootstrappingChooser.choose(BootstrappingChooser.scala:204)
at 
org.apache.samza.system.chooser.DefaultChooser.choose(DefaultChooser.scala:294)
at org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:210)
at org.apache.samza.task.AsyncRunLoop.chooseEnvelope(AsyncRunLoop.java:208)
at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:156)
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:787)
at 
org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:101)
at 
org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:148)

Author: Hai Lu <[email protected]>

Reviewers: Xinyu Liu <[email protected]>

Closes #633 from lhaiesp/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2b5970d8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2b5970d8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2b5970d8

Branch: refs/heads/master
Commit: 2b5970d815999870d910792120dea0180ab856a6
Parents: 6668351
Author: Hai Lu <[email protected]>
Authored: Tue Sep 11 09:57:59 2018 -0700
Committer: xiliu <[email protected]>
Committed: Tue Sep 11 09:57:59 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/system/hdfs/HdfsSystemAdmin.java   | 12 ++++++++++++
 1 file changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2b5970d8/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
----------------------------------------------------------------------
diff --git 
a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java 
b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
index 28a1bac..0d50f26 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
+import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
@@ -221,6 +222,17 @@ public class HdfsSystemAdmin implements SystemAdmin {
     if (StringUtils.isBlank(offset1) || StringUtils.isBlank(offset2)) {
       return null;
     }
+    /*
+     * Properly handle END_OF_STREAM offset here. If both are END_OF_STREAM,
+     * then they are equal. Otherwise END_OF_STREAM is always greater than any
+     * other offsets.
+     */
+    if (offset1.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) {
+      return offset2.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET) ? 0 
: 1;
+    }
+    if (offset2.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) {
+      return -1;
+    }
     int fileIndex1 = MultiFileHdfsReader.getCurFileIndex(offset1);
     int fileIndex2 = MultiFileHdfsReader.getCurFileIndex(offset2);
     if (fileIndex1 == fileIndex2) {

Reply via email to