This is an automated email from the ASF dual-hosted git repository. fjy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push: new c0195a1 Fix HDFS input source split (#9574) c0195a1 is described below commit c0195a19e4e9bef50dd1bb5f1b63d03b4f3e318f Author: Chi Cao Minh <chi.caom...@imply.io> AuthorDate: Sat Mar 28 15:45:57 2020 -0700 Fix HDFS input source split (#9574) Fixes an issue where splitting an HDFS input source for use in native parallel batch ingestion would cause the subtasks to get a split with an invalid HDFS path. --- .../apache/druid/inputsource/hdfs/HdfsInputSource.java | 7 +++++-- .../druid/inputsource/hdfs/HdfsInputSourceTest.java | 16 ++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java index be7f3c8..b8c798a 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java @@ -22,6 +22,7 @@ package org.apache.druid.inputsource.hdfs; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; @@ -142,8 +143,9 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn } } + @VisibleForTesting @JsonProperty(PROP_PATHS) - private List<String> getInputPaths() + List<String> getInputPaths() { return inputPaths; } @@ -199,7 +201,8 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn @Override public SplittableInputSource<List<Path>> withSplit(InputSplit<List<Path>> split) { - return new HdfsInputSource(split.get().toString(), configuration); + List<String> paths = split.get().stream().map(path -> path.toString()).collect(Collectors.toList()); + return new HdfsInputSource(paths, configuration); } @Override diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java index d2c7820..044930f 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java @@ -22,6 +22,7 @@ package org.apache.druid.inputsource.hdfs; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; @@ -277,6 +278,21 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest int numSplits = target.estimateNumSplits(null, new MaxSizeSplitHintSpec(1L)); Assert.assertEquals(NUM_FILE, numSplits); } + + @Test + public void createCorrectInputSourceWithSplit() throws Exception + { + // Set maxSplitSize to 1 so that each inputSplit has only one object + List<InputSplit<List<Path>>> splits = target.createSplits(null, new MaxSizeSplitHintSpec(1L)) + .collect(Collectors.toList()); + + for (InputSplit<List<Path>> split : splits) { + String expectedPath = Iterables.getOnlyElement(split.get()).toString(); + HdfsInputSource inputSource = (HdfsInputSource) target.withSplit(split); + String actualPath = Iterables.getOnlyElement(inputSource.getInputPaths()); + Assert.assertEquals(expectedPath, actualPath); + } + } } public static class EmptyPathsTest --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org