This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new efb32810c4 Clean up the core API required for Iceberg extension
(#14614)
efb32810c4 is described below
commit efb32810c48fd9c0bf81cefb15179f74a4cde661
Author: Abhishek Agarwal <[email protected]>
AuthorDate: Fri Jul 21 13:01:33 2023 +0530
Clean up the core API required for Iceberg extension (#14614)
Changes:
- Replace `AbstractInputSourceBuilder` with `InputSourceFactory`
- Move iceberg specific logic to `IcebergInputSource`
---
.../druid/iceberg/input/IcebergInputSource.java | 84 +++++++++++++-
.../iceberg/input/IcebergInputSourceTest.java | 45 +++++++-
...rceBuilder.java => HdfsInputSourceFactory.java} | 8 +-
.../druid/storage/hdfs/HdfsStorageDruidModule.java | 4 +-
.../hdfs/HdfsInputSourceAdapterTest.java | 4 +-
.../data/input/s3/S3InputSourceDruidModule.java | 2 +-
...ourceBuilder.java => S3InputSourceFactory.java} | 8 +-
...lderTest.java => S3InputSourceFactoryTest.java} | 6 +-
.../data/input/AbstractInputSourceBuilder.java | 123 ---------------------
.../druid/data/input/InputSourceFactory.java | 44 ++++++++
...ceBuilder.java => LocalInputSourceFactory.java} | 7 +-
.../input/impl/LocalInputSourceAdapterTest.java | 48 +-------
12 files changed, 185 insertions(+), 198 deletions(-)
diff --git
a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java
b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java
index 81899703dc..1d7fc689b2 100644
---
a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java
+++
b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java
@@ -22,19 +22,25 @@ package org.apache.druid.iceberg.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
-import org.apache.druid.data.input.AbstractInputSourceBuilder;
import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSourceFactory;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.InputStats;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.iceberg.filter.IcebergFilter;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
@@ -60,7 +66,7 @@ public class IcebergInputSource implements
SplittableInputSource<List<String>>
private IcebergFilter icebergFilter;
@JsonProperty
- private AbstractInputSourceBuilder warehouseSource;
+ private InputSourceFactory warehouseSource;
private boolean isLoaded = false;
@@ -72,7 +78,7 @@ public class IcebergInputSource implements
SplittableInputSource<List<String>>
@JsonProperty("namespace") String namespace,
@JsonProperty("icebergFilter") @Nullable IcebergFilter icebergFilter,
@JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog,
- @JsonProperty("warehouseSource") AbstractInputSourceBuilder
warehouseSource
+ @JsonProperty("warehouseSource") InputSourceFactory warehouseSource
)
{
this.tableName = Preconditions.checkNotNull(tableName, "tableName cannot
be null");
@@ -170,7 +176,77 @@ public class IcebergInputSource implements
SplittableInputSource<List<String>>
getTableName(),
getIcebergFilter()
);
- delegateInputSource = warehouseSource.setupInputSource(snapshotDataFiles);
+ if (snapshotDataFiles.isEmpty()) {
+ delegateInputSource = new EmptyInputSource();
+ } else {
+ delegateInputSource = warehouseSource.create(snapshotDataFiles);
+ }
isLoaded = true;
}
+
+ /**
+ * This input source is used in place of a delegate input source if there
are no input file paths.
+ * Certain input sources cannot be instantiated with an empty input file
list and so composing input sources such as IcebergInputSource
+ * may use this input source as delegate in such cases.
+ */
+ private static class EmptyInputSource implements SplittableInputSource
+ {
+ @Override
+ public boolean needsFormat()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isSplittable()
+ {
+ return false;
+ }
+
+ @Override
+ public InputSourceReader reader(
+ InputRowSchema inputRowSchema,
+ @Nullable InputFormat inputFormat,
+ File temporaryDirectory
+ )
+ {
+ return new InputSourceReader()
+ {
+ @Override
+ public CloseableIterator<InputRow> read(InputStats inputStats)
+ {
+ return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
+ });
+ }
+
+ @Override
+ public CloseableIterator<InputRowListPlusRawValues> sample()
+ {
+ return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
+ });
+ }
+ };
+ }
+
+ @Override
+ public Stream<InputSplit> createSplits(
+ InputFormat inputFormat,
+ @Nullable SplitHintSpec splitHintSpec
+ )
+ {
+ return Stream.empty();
+ }
+
+ @Override
+ public int estimateNumSplits(InputFormat inputFormat, @Nullable
SplitHintSpec splitHintSpec)
+ {
+ return 0;
+ }
+
+ @Override
+ public InputSource withSplit(InputSplit split)
+ {
+ return null;
+ }
+ }
}
diff --git
a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java
b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java
index 9c3de4922b..ec6d936d14 100644
---
a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java
+++
b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java
@@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.LocalInputSource;
-import org.apache.druid.data.input.impl.LocalInputSourceBuilder;
+import org.apache.druid.data.input.impl.LocalInputSourceFactory;
import org.apache.druid.iceberg.filter.IcebergEqualsFilter;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.iceberg.DataFile;
@@ -87,7 +87,7 @@ public class IcebergInputSourceTest
NAMESPACE,
null,
testCatalog,
- new LocalInputSourceBuilder()
+ new LocalInputSourceFactory()
);
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null,
new MaxSizeSplitHintSpec(null, null));
List<File> localInputSourceList = splits.map(inputSource::withSplit)
@@ -115,7 +115,7 @@ public class IcebergInputSourceTest
}
@Test
- public void testInputSourceWithFilter() throws IOException
+ public void testInputSourceWithEmptySource() throws IOException
{
final File warehouseDir = FileUtils.createTempDir();
testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>());
@@ -128,7 +128,28 @@ public class IcebergInputSourceTest
NAMESPACE,
new IcebergEqualsFilter("id", "0000"),
testCatalog,
- new LocalInputSourceBuilder()
+ new LocalInputSourceFactory()
+ );
+ Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null,
new MaxSizeSplitHintSpec(null, null));
+ Assert.assertEquals(0, splits.count());
+ dropTableFromCatalog(tableIdentifier);
+ }
+
+ @Test
+ public void testInputSourceWithFilter() throws IOException
+ {
+ final File warehouseDir = FileUtils.createTempDir();
+ testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>());
+ TableIdentifier tableIdentifier =
TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME);
+
+ createAndLoadTable(tableIdentifier);
+
+ IcebergInputSource inputSource = new IcebergInputSource(
+ TABLENAME,
+ NAMESPACE,
+ new IcebergEqualsFilter("id", "123988"),
+ testCatalog,
+ new LocalInputSourceFactory()
);
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null,
new MaxSizeSplitHintSpec(null, null));
List<File> localInputSourceList = splits.map(inputSource::withSplit)
@@ -137,7 +158,21 @@ public class IcebergInputSourceTest
.flatMap(List::stream)
.collect(Collectors.toList());
- Assert.assertEquals(0, localInputSourceList.size());
+ Assert.assertEquals(1, inputSource.estimateNumSplits(null, new
MaxSizeSplitHintSpec(1L, null)));
+ Assert.assertEquals(1, localInputSourceList.size());
+ CloseableIterable<Record> datafileReader =
Parquet.read(Files.localInput(localInputSourceList.get(0)))
+ .project(tableSchema)
+
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(
+ tableSchema,
+ fileSchema
+ ))
+ .build();
+
+
+ for (Record record : datafileReader) {
+ Assert.assertEquals(tableData.get("id"), record.get(0));
+ Assert.assertEquals(tableData.get("name"), record.get(1));
+ }
dropTableFromCatalog(tableIdentifier);
}
diff --git
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceFactory.java
similarity index 86%
rename from
extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java
rename to
extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceFactory.java
index 94b67ca93d..d2b9da3e34 100644
---
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java
+++
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceFactory.java
@@ -21,20 +21,20 @@ package org.apache.druid.inputsource.hdfs;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.druid.data.input.AbstractInputSourceBuilder;
+import org.apache.druid.data.input.InputSourceFactory;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.Hdfs;
import org.apache.hadoop.conf.Configuration;
import java.util.List;
-public class HdfsInputSourceBuilder extends AbstractInputSourceBuilder
+public class HdfsInputSourceFactory implements InputSourceFactory
{
private final Configuration configuration;
private final HdfsInputSourceConfig inputSourceConfig;
@JsonCreator
- public HdfsInputSourceBuilder(
+ public HdfsInputSourceFactory(
@JacksonInject @Hdfs Configuration configuration,
@JacksonInject HdfsInputSourceConfig inputSourceConfig
)
@@ -44,7 +44,7 @@ public class HdfsInputSourceBuilder extends
AbstractInputSourceBuilder
}
@Override
- public SplittableInputSource generateInputSource(List<String> inputFilePaths)
+ public SplittableInputSource create(List<String> inputFilePaths)
{
return new HdfsInputSource(inputFilePaths, configuration,
inputSourceConfig);
}
diff --git
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
index 0923450c5f..e2c79785fe 100644
---
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
+++
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
@@ -34,8 +34,8 @@ import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.inputsource.hdfs.HdfsInputSource;
-import org.apache.druid.inputsource.hdfs.HdfsInputSourceBuilder;
import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
+import org.apache.druid.inputsource.hdfs.HdfsInputSourceFactory;
import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs;
import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
import org.apache.hadoop.conf.Configuration;
@@ -67,7 +67,7 @@ public class HdfsStorageDruidModule implements DruidModule
new SimpleModule().registerSubtypes(
new NamedType(HdfsLoadSpec.class, HdfsStorageDruidModule.SCHEME),
new NamedType(HdfsInputSource.class,
HdfsStorageDruidModule.SCHEME),
- new NamedType(HdfsInputSourceBuilder.class,
HdfsStorageDruidModule.SCHEME)
+ new NamedType(HdfsInputSourceFactory.class,
HdfsStorageDruidModule.SCHEME)
)
);
}
diff --git
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceAdapterTest.java
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceAdapterTest.java
index 855a67ac72..863f083c71 100644
---
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceAdapterTest.java
+++
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceAdapterTest.java
@@ -32,7 +32,7 @@ public class HdfsInputSourceAdapterTest
{
Configuration conf = new Configuration();
HdfsInputSourceConfig inputSourceConfig = new HdfsInputSourceConfig(null);
- HdfsInputSourceBuilder hdfsInputSourceAdapter = new
HdfsInputSourceBuilder(conf, inputSourceConfig);
-
Assert.assertTrue(hdfsInputSourceAdapter.generateInputSource(Arrays.asList("hdfs://localhost:7020/bar/def.parquet",
"hdfs://localhost:7020/bar/abc.parquet")) instanceof HdfsInputSource);
+ HdfsInputSourceFactory hdfsInputSourceAdapter = new
HdfsInputSourceFactory(conf, inputSourceConfig);
+
Assert.assertTrue(hdfsInputSourceAdapter.create(Arrays.asList("hdfs://localhost:7020/bar/def.parquet",
"hdfs://localhost:7020/bar/abc.parquet")) instanceof HdfsInputSource);
}
}
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
index b8227d1945..7727c35115 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
@@ -40,7 +40,7 @@ public class S3InputSourceDruidModule implements DruidModule
return ImmutableList.of(
new SimpleModule().registerSubtypes(
new NamedType(S3InputSource.class, S3StorageDruidModule.SCHEME),
- new NamedType(S3InputSourceBuilder.class,
S3StorageDruidModule.SCHEME)
+ new NamedType(S3InputSourceFactory.class,
S3StorageDruidModule.SCHEME)
)
);
}
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceFactory.java
similarity index 94%
rename from
extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java
rename to
extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceFactory.java
index 1755cb041c..d8df28ca6b 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceFactory.java
@@ -27,7 +27,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.aws.AWSClientConfig;
import org.apache.druid.common.aws.AWSEndpointConfig;
import org.apache.druid.common.aws.AWSProxyConfig;
-import org.apache.druid.data.input.AbstractInputSourceBuilder;
+import org.apache.druid.data.input.InputSourceFactory;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.s3.S3InputDataConfig;
@@ -39,7 +39,7 @@ import java.net.URISyntaxException;
import java.util.List;
import java.util.stream.Collectors;
-public class S3InputSourceBuilder extends AbstractInputSourceBuilder
+public class S3InputSourceFactory implements InputSourceFactory
{
private final ServerSideEncryptingAmazonS3 s3Client;
private final ServerSideEncryptingAmazonS3.Builder s3ClientBuilder;
@@ -51,7 +51,7 @@ public class S3InputSourceBuilder extends
AbstractInputSourceBuilder
private final AWSEndpointConfig awsEndpointConfig;
@JsonCreator
- public S3InputSourceBuilder(
+ public S3InputSourceFactory(
@JacksonInject ServerSideEncryptingAmazonS3 s3Client,
@JacksonInject ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
@JacksonInject S3InputDataConfig inputDataConfig,
@@ -73,7 +73,7 @@ public class S3InputSourceBuilder extends
AbstractInputSourceBuilder
}
@Override
- public SplittableInputSource generateInputSource(List<String> inputFilePaths)
+ public SplittableInputSource create(List<String> inputFilePaths)
{
return new S3InputSource(
s3Client,
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceFactoryTest.java
similarity index 90%
rename from
extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java
rename to
extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceFactoryTest.java
index bb42975cd6..2304b0f8bf 100644
---
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceFactoryTest.java
@@ -28,7 +28,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.List;
-public class S3InputSourceBuilderTest
+public class S3InputSourceFactoryTest
{
@Test
public void testAdapterGet()
@@ -43,7 +43,7 @@ public class S3InputSourceBuilderTest
"s3://bar/foo/file3.txt"
);
- S3InputSourceBuilder s3Builder = new S3InputSourceBuilder(
+ S3InputSourceFactory s3Builder = new S3InputSourceFactory(
service,
serverSides3Builder,
dataConfig,
@@ -53,6 +53,6 @@ public class S3InputSourceBuilderTest
null,
null
);
- Assert.assertTrue(s3Builder.generateInputSource(fileUris) instanceof
S3InputSource);
+ Assert.assertTrue(s3Builder.create(fileUris) instanceof S3InputSource);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java
b/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java
deleted file mode 100644
index a0af825c1a..0000000000
---
a/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.data.input;
-
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.druid.data.input.impl.LocalInputSourceBuilder;
-import org.apache.druid.data.input.impl.SplittableInputSource;
-import org.apache.druid.java.util.common.CloseableIterators;
-import org.apache.druid.java.util.common.parsers.CloseableIterator;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Stream;
-
-/**
- * A wrapper on top of {@link SplittableInputSource} that handles input source
creation.
- * For composing input sources such as IcebergInputSource, the delegate input
source instantiation might fail upon deserialization since the input file paths
- * are not available yet and this might fail the input source precondition
checks.
- * This adapter helps create the delegate input source once the input file
paths are fully determined.
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
-@JsonSubTypes(value = {
- @JsonSubTypes.Type(name = LocalInputSourceBuilder.TYPE_KEY, value =
LocalInputSourceBuilder.class)
-})
-public abstract class AbstractInputSourceBuilder
-{
- public abstract SplittableInputSource generateInputSource(List<String>
inputFilePaths);
-
- public SplittableInputSource setupInputSource(List<String> inputFilePaths)
- {
- if (inputFilePaths.isEmpty()) {
- return new EmptyInputSource();
- } else {
- return generateInputSource(inputFilePaths);
- }
- }
-
- /**
- * This input source is used in place of a delegate input source if there
are no input file paths.
- * Certain input sources cannot be instantiated with an empty input file
list and so composing input sources such as IcebergInputSource
- * may use this input source as delegate in such cases.
- */
- private static class EmptyInputSource implements SplittableInputSource
- {
- @Override
- public boolean needsFormat()
- {
- return false;
- }
-
- @Override
- public boolean isSplittable()
- {
- return false;
- }
-
- @Override
- public InputSourceReader reader(
- InputRowSchema inputRowSchema,
- @Nullable InputFormat inputFormat,
- File temporaryDirectory
- )
- {
- return new InputSourceReader()
- {
- @Override
- public CloseableIterator<InputRow> read(InputStats inputStats)
- {
- return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
- });
- }
-
- @Override
- public CloseableIterator<InputRowListPlusRawValues> sample()
- {
- return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
- });
- }
- };
- }
-
- @Override
- public Stream<InputSplit> createSplits(
- InputFormat inputFormat,
- @Nullable SplitHintSpec splitHintSpec
- )
- {
- return Stream.empty();
- }
-
- @Override
- public int estimateNumSplits(InputFormat inputFormat, @Nullable
SplitHintSpec splitHintSpec)
- {
- return 0;
- }
-
- @Override
- public InputSource withSplit(InputSplit split)
- {
- return null;
- }
- }
-}
diff --git
a/processing/src/main/java/org/apache/druid/data/input/InputSourceFactory.java
b/processing/src/main/java/org/apache/druid/data/input/InputSourceFactory.java
new file mode 100644
index 0000000000..4b298695cd
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/data/input/InputSourceFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.data.input.impl.LocalInputSourceFactory;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.guice.annotations.UnstableApi;
+
+import java.util.List;
+
+/**
+ * An interface to generate a {@link SplittableInputSource} objects on the fly.
+ * For composing input sources such as IcebergInputSource, the delegate input
source instantiation might fail upon deserialization since the input file paths
+ * are not available yet and this might fail the input source precondition
checks.
+ * This factory helps create the delegate input source once the input file
paths are fully determined.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = "local", value = LocalInputSourceFactory.class)
+})
+@UnstableApi
+public interface InputSourceFactory
+{
+ SplittableInputSource create(List<String> inputFilePaths);
+}
diff --git
a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java
b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceFactory.java
similarity index 81%
rename from
processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java
rename to
processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceFactory.java
index 6e2f44b567..b2fa6a13dc 100644
---
a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java
+++
b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceFactory.java
@@ -19,18 +19,17 @@
package org.apache.druid.data.input.impl;
-import org.apache.druid.data.input.AbstractInputSourceBuilder;
+import org.apache.druid.data.input.InputSourceFactory;
import java.io.File;
import java.util.List;
import java.util.stream.Collectors;
-public class LocalInputSourceBuilder extends AbstractInputSourceBuilder
+public class LocalInputSourceFactory implements InputSourceFactory
{
- public static final String TYPE_KEY = "local";
@Override
- public LocalInputSource generateInputSource(List<String> inputFilePaths)
+ public LocalInputSource create(List<String> inputFilePaths)
{
return new LocalInputSource(
null,
diff --git
a/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceAdapterTest.java
b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceAdapterTest.java
index 78a33c05b4..38ba640e49 100644
---
a/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceAdapterTest.java
+++
b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceAdapterTest.java
@@ -19,22 +19,12 @@
package org.apache.druid.data.input.impl;
-import org.apache.druid.data.input.InputFormat;
-import org.apache.druid.data.input.InputRowSchema;
-import org.apache.druid.data.input.InputSource;
-import org.apache.druid.data.input.InputSplit;
-import org.apache.druid.data.input.SplitHintSpec;
-import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import java.io.IOException;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
public class LocalInputSourceAdapterTest
{
@@ -44,44 +34,10 @@ public class LocalInputSourceAdapterTest
@Test
public void testAdapterGet()
{
- LocalInputSourceBuilder localInputSourceAdapter = new
LocalInputSourceBuilder();
-
Assert.assertTrue(localInputSourceAdapter.generateInputSource(Arrays.asList(
+ LocalInputSourceFactory localInputSourceAdapter = new
LocalInputSourceFactory();
+ Assert.assertTrue(localInputSourceAdapter.create(Arrays.asList(
"foo.parquet",
"bar.parquet"
)) instanceof LocalInputSource);
}
-
- @Test
- public void testAdapterSetup()
- {
- LocalInputSourceBuilder localInputSourceAdapter = new
LocalInputSourceBuilder();
- InputSource delegateInputSource =
localInputSourceAdapter.setupInputSource(Arrays.asList(
- "foo.parquet",
- "bar.parquet"
- ));
- Assert.assertTrue(delegateInputSource instanceof LocalInputSource);
- }
-
- @Test
- public void testEmptyInputSource() throws IOException
- {
- InputFormat mockFormat = EasyMock.createMock(InputFormat.class);
- SplitHintSpec mockSplitHint = EasyMock.createMock(SplitHintSpec.class);
- LocalInputSourceBuilder localInputSourceAdapter = new
LocalInputSourceBuilder();
- SplittableInputSource<Object> emptyInputSource =
- (SplittableInputSource<Object>)
localInputSourceAdapter.setupInputSource(Collections.emptyList());
- List<InputSplit<Object>> splitList = emptyInputSource
- .createSplits(mockFormat, mockSplitHint)
- .collect(Collectors.toList());
- Assert.assertTrue(splitList.isEmpty());
- Assert.assertFalse(emptyInputSource.isSplittable());
- Assert.assertFalse(emptyInputSource.needsFormat());
-
Assert.assertNull(emptyInputSource.withSplit(EasyMock.createMock(InputSplit.class)));
- Assert.assertEquals(0, emptyInputSource.estimateNumSplits(mockFormat,
mockSplitHint));
- Assert.assertFalse(emptyInputSource.reader(
- EasyMock.createMock(InputRowSchema.class),
- mockFormat,
- temporaryFolder.newFolder()
- ).read().hasNext());
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]