Repository: samza Updated Branches: refs/heads/master 666835186 -> 2b5970d81
SAMZA-1870: hdfs offset comparator to handle end of stream offset This happens particularly when using HDFS as a bootstrap stream: org.apache.samza.SamzaException: Invalid offset for MultiFileHdfsReader: END_OF_STREAM at org.apache.samza.system.hdfs.reader.MultiFileHdfsReader.getCurFileIndex(MultiFileHdfsReader.java:64) at org.apache.samza.system.hdfs.HdfsSystemAdmin.offsetComparator(HdfsSystemAdmin.java:224) at org.apache.samza.system.chooser.BootstrappingChooser.org$apache$samza$system$chooser$BootstrappingChooser$$checkOffset(BootstrappingChooser.scala:274) at org.apache.samza.system.chooser.BootstrappingChooser.choose(BootstrappingChooser.scala:204) at org.apache.samza.system.chooser.DefaultChooser.choose(DefaultChooser.scala:294) at org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:210) at org.apache.samza.task.AsyncRunLoop.chooseEnvelope(AsyncRunLoop.java:208) at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:156) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:787) at org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:101) at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:148) Author: Hai Lu <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes #633 from lhaiesp/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2b5970d8 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2b5970d8 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2b5970d8 Branch: refs/heads/master Commit: 2b5970d815999870d910792120dea0180ab856a6 Parents: 6668351 Author: Hai Lu <[email protected]> Authored: Tue Sep 11 09:57:59 2018 -0700 Committer: xiliu <[email protected]> Committed: Tue Sep 11 09:57:59 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/system/hdfs/HdfsSystemAdmin.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/2b5970d8/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java index 28a1bac..0d50f26 100644 --- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java +++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path; import org.apache.samza.Partition; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; +import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamPartition; @@ -221,6 +222,17 @@ public class HdfsSystemAdmin implements SystemAdmin { if (StringUtils.isBlank(offset1) || StringUtils.isBlank(offset2)) { return null; } + /* + * Properly handle END_OF_STREAM offset here. If both are END_OF_STREAM, + * then they are equal. Otherwise END_OF_STREAM is always greater than any + * other offsets. + */ + if (offset1.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) { + return offset2.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET) ? 0 : 1; + } + if (offset2.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) { + return -1; + } int fileIndex1 = MultiFileHdfsReader.getCurFileIndex(offset1); int fileIndex2 = MultiFileHdfsReader.getCurFileIndex(offset2); if (fileIndex1 == fileIndex2) {
