This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new efb32810c4 Clean up the core API required for Iceberg extension 
(#14614)
efb32810c4 is described below

commit efb32810c48fd9c0bf81cefb15179f74a4cde661
Author: Abhishek Agarwal <[email protected]>
AuthorDate: Fri Jul 21 13:01:33 2023 +0530

    Clean up the core API required for Iceberg extension (#14614)
    
    Changes:
    - Replace `AbstractInputSourceBuilder` with `InputSourceFactory`
    - Move iceberg specific logic to `IcebergInputSource`
---
 .../druid/iceberg/input/IcebergInputSource.java    |  84 +++++++++++++-
 .../iceberg/input/IcebergInputSourceTest.java      |  45 +++++++-
 ...rceBuilder.java => HdfsInputSourceFactory.java} |   8 +-
 .../druid/storage/hdfs/HdfsStorageDruidModule.java |   4 +-
 .../hdfs/HdfsInputSourceAdapterTest.java           |   4 +-
 .../data/input/s3/S3InputSourceDruidModule.java    |   2 +-
 ...ourceBuilder.java => S3InputSourceFactory.java} |   8 +-
 ...lderTest.java => S3InputSourceFactoryTest.java} |   6 +-
 .../data/input/AbstractInputSourceBuilder.java     | 123 ---------------------
 .../druid/data/input/InputSourceFactory.java       |  44 ++++++++
 ...ceBuilder.java => LocalInputSourceFactory.java} |   7 +-
 .../input/impl/LocalInputSourceAdapterTest.java    |  48 +-------
 12 files changed, 185 insertions(+), 198 deletions(-)

diff --git 
a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java
 
b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java
index 81899703dc..1d7fc689b2 100644
--- 
a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java
+++ 
b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java
@@ -22,19 +22,25 @@ package org.apache.druid.iceberg.input;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
-import org.apache.druid.data.input.AbstractInputSourceBuilder;
 import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSourceFactory;
 import org.apache.druid.data.input.InputSourceReader;
 import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.InputStats;
 import org.apache.druid.data.input.SplitHintSpec;
 import org.apache.druid.data.input.impl.SplittableInputSource;
 import org.apache.druid.iceberg.filter.IcebergFilter;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Stream;
 
@@ -60,7 +66,7 @@ public class IcebergInputSource implements 
SplittableInputSource<List<String>>
   private IcebergFilter icebergFilter;
 
   @JsonProperty
-  private AbstractInputSourceBuilder warehouseSource;
+  private InputSourceFactory warehouseSource;
 
   private boolean isLoaded = false;
 
@@ -72,7 +78,7 @@ public class IcebergInputSource implements 
SplittableInputSource<List<String>>
       @JsonProperty("namespace") String namespace,
       @JsonProperty("icebergFilter") @Nullable IcebergFilter icebergFilter,
       @JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog,
-      @JsonProperty("warehouseSource") AbstractInputSourceBuilder 
warehouseSource
+      @JsonProperty("warehouseSource") InputSourceFactory warehouseSource
   )
   {
     this.tableName = Preconditions.checkNotNull(tableName, "tableName cannot 
be null");
@@ -170,7 +176,77 @@ public class IcebergInputSource implements 
SplittableInputSource<List<String>>
         getTableName(),
         getIcebergFilter()
     );
-    delegateInputSource = warehouseSource.setupInputSource(snapshotDataFiles);
+    if (snapshotDataFiles.isEmpty()) {
+      delegateInputSource = new EmptyInputSource();
+    } else {
+      delegateInputSource = warehouseSource.create(snapshotDataFiles);
+    }
     isLoaded = true;
   }
+
+  /**
+   * This input source is used in place of a delegate input source if there 
are no input file paths.
+   * Certain input sources cannot be instantiated with an empty input file 
list and so composing input sources such as IcebergInputSource
+   * may use this input source as delegate in such cases.
+   */
+  private static class EmptyInputSource implements SplittableInputSource
+  {
+    @Override
+    public boolean needsFormat()
+    {
+      return false;
+    }
+
+    @Override
+    public boolean isSplittable()
+    {
+      return false;
+    }
+
+    @Override
+    public InputSourceReader reader(
+        InputRowSchema inputRowSchema,
+        @Nullable InputFormat inputFormat,
+        File temporaryDirectory
+    )
+    {
+      return new InputSourceReader()
+      {
+        @Override
+        public CloseableIterator<InputRow> read(InputStats inputStats)
+        {
+          return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
+          });
+        }
+
+        @Override
+        public CloseableIterator<InputRowListPlusRawValues> sample()
+        {
+          return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
+          });
+        }
+      };
+    }
+
+    @Override
+    public Stream<InputSplit> createSplits(
+        InputFormat inputFormat,
+        @Nullable SplitHintSpec splitHintSpec
+    )
+    {
+      return Stream.empty();
+    }
+
+    @Override
+    public int estimateNumSplits(InputFormat inputFormat, @Nullable 
SplitHintSpec splitHintSpec)
+    {
+      return 0;
+    }
+
+    @Override
+    public InputSource withSplit(InputSplit split)
+    {
+      return null;
+    }
+  }
 }
diff --git 
a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java
 
b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java
index 9c3de4922b..ec6d936d14 100644
--- 
a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java
+++ 
b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java
@@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.druid.data.input.InputSplit;
 import org.apache.druid.data.input.MaxSizeSplitHintSpec;
 import org.apache.druid.data.input.impl.LocalInputSource;
-import org.apache.druid.data.input.impl.LocalInputSourceBuilder;
+import org.apache.druid.data.input.impl.LocalInputSourceFactory;
 import org.apache.druid.iceberg.filter.IcebergEqualsFilter;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.iceberg.DataFile;
@@ -87,7 +87,7 @@ public class IcebergInputSourceTest
         NAMESPACE,
         null,
         testCatalog,
-        new LocalInputSourceBuilder()
+        new LocalInputSourceFactory()
     );
     Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, 
new MaxSizeSplitHintSpec(null, null));
     List<File> localInputSourceList = splits.map(inputSource::withSplit)
@@ -115,7 +115,7 @@ public class IcebergInputSourceTest
   }
 
   @Test
-  public void testInputSourceWithFilter() throws IOException
+  public void testInputSourceWithEmptySource() throws IOException
   {
     final File warehouseDir = FileUtils.createTempDir();
     testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>());
@@ -128,7 +128,28 @@ public class IcebergInputSourceTest
         NAMESPACE,
         new IcebergEqualsFilter("id", "0000"),
         testCatalog,
-        new LocalInputSourceBuilder()
+        new LocalInputSourceFactory()
+    );
+    Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, 
new MaxSizeSplitHintSpec(null, null));
+    Assert.assertEquals(0, splits.count());
+    dropTableFromCatalog(tableIdentifier);
+  }
+
+  @Test
+  public void testInputSourceWithFilter() throws IOException
+  {
+    final File warehouseDir = FileUtils.createTempDir();
+    testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>());
+    TableIdentifier tableIdentifier = 
TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME);
+
+    createAndLoadTable(tableIdentifier);
+
+    IcebergInputSource inputSource = new IcebergInputSource(
+        TABLENAME,
+        NAMESPACE,
+        new IcebergEqualsFilter("id", "123988"),
+        testCatalog,
+        new LocalInputSourceFactory()
     );
     Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, 
new MaxSizeSplitHintSpec(null, null));
     List<File> localInputSourceList = splits.map(inputSource::withSplit)
@@ -137,7 +158,21 @@ public class IcebergInputSourceTest
                                             .flatMap(List::stream)
                                             .collect(Collectors.toList());
 
-    Assert.assertEquals(0, localInputSourceList.size());
+    Assert.assertEquals(1, inputSource.estimateNumSplits(null, new 
MaxSizeSplitHintSpec(1L, null)));
+    Assert.assertEquals(1, localInputSourceList.size());
+    CloseableIterable<Record> datafileReader = 
Parquet.read(Files.localInput(localInputSourceList.get(0)))
+                                                      .project(tableSchema)
+                                                      
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(
+                                                          tableSchema,
+                                                          fileSchema
+                                                      ))
+                                                      .build();
+
+
+    for (Record record : datafileReader) {
+      Assert.assertEquals(tableData.get("id"), record.get(0));
+      Assert.assertEquals(tableData.get("name"), record.get(1));
+    }
     dropTableFromCatalog(tableIdentifier);
   }
 
diff --git 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java
 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceFactory.java
similarity index 86%
rename from 
extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java
rename to 
extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceFactory.java
index 94b67ca93d..d2b9da3e34 100644
--- 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java
+++ 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceFactory.java
@@ -21,20 +21,20 @@ package org.apache.druid.inputsource.hdfs;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.druid.data.input.AbstractInputSourceBuilder;
+import org.apache.druid.data.input.InputSourceFactory;
 import org.apache.druid.data.input.impl.SplittableInputSource;
 import org.apache.druid.guice.Hdfs;
 import org.apache.hadoop.conf.Configuration;
 
 import java.util.List;
 
-public class HdfsInputSourceBuilder extends AbstractInputSourceBuilder
+public class HdfsInputSourceFactory implements InputSourceFactory
 {
   private final Configuration configuration;
   private final HdfsInputSourceConfig inputSourceConfig;
 
   @JsonCreator
-  public HdfsInputSourceBuilder(
+  public HdfsInputSourceFactory(
       @JacksonInject @Hdfs Configuration configuration,
       @JacksonInject HdfsInputSourceConfig inputSourceConfig
   )
@@ -44,7 +44,7 @@ public class HdfsInputSourceBuilder extends 
AbstractInputSourceBuilder
   }
 
   @Override
-  public SplittableInputSource generateInputSource(List<String> inputFilePaths)
+  public SplittableInputSource create(List<String> inputFilePaths)
   {
     return new HdfsInputSource(inputFilePaths, configuration, 
inputSourceConfig);
   }
diff --git 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
index 0923450c5f..e2c79785fe 100644
--- 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
+++ 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
@@ -34,8 +34,8 @@ import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.inputsource.hdfs.HdfsInputSource;
-import org.apache.druid.inputsource.hdfs.HdfsInputSourceBuilder;
 import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
+import org.apache.druid.inputsource.hdfs.HdfsInputSourceFactory;
 import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs;
 import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
 import org.apache.hadoop.conf.Configuration;
@@ -67,7 +67,7 @@ public class HdfsStorageDruidModule implements DruidModule
         new SimpleModule().registerSubtypes(
             new NamedType(HdfsLoadSpec.class, HdfsStorageDruidModule.SCHEME),
             new NamedType(HdfsInputSource.class, 
HdfsStorageDruidModule.SCHEME),
-            new NamedType(HdfsInputSourceBuilder.class, 
HdfsStorageDruidModule.SCHEME)
+            new NamedType(HdfsInputSourceFactory.class, 
HdfsStorageDruidModule.SCHEME)
         )
     );
   }
diff --git 
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceAdapterTest.java
 
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceAdapterTest.java
index 855a67ac72..863f083c71 100644
--- 
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceAdapterTest.java
+++ 
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceAdapterTest.java
@@ -32,7 +32,7 @@ public class HdfsInputSourceAdapterTest
   {
     Configuration conf = new Configuration();
     HdfsInputSourceConfig inputSourceConfig = new HdfsInputSourceConfig(null);
-    HdfsInputSourceBuilder hdfsInputSourceAdapter = new 
HdfsInputSourceBuilder(conf, inputSourceConfig);
-    
Assert.assertTrue(hdfsInputSourceAdapter.generateInputSource(Arrays.asList("hdfs://localhost:7020/bar/def.parquet",
 "hdfs://localhost:7020/bar/abc.parquet")) instanceof HdfsInputSource);
+    HdfsInputSourceFactory hdfsInputSourceAdapter = new 
HdfsInputSourceFactory(conf, inputSourceConfig);
+    
Assert.assertTrue(hdfsInputSourceAdapter.create(Arrays.asList("hdfs://localhost:7020/bar/def.parquet",
 "hdfs://localhost:7020/bar/abc.parquet")) instanceof HdfsInputSource);
   }
 }
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
index b8227d1945..7727c35115 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
@@ -40,7 +40,7 @@ public class S3InputSourceDruidModule implements DruidModule
     return ImmutableList.of(
         new SimpleModule().registerSubtypes(
             new NamedType(S3InputSource.class, S3StorageDruidModule.SCHEME),
-            new NamedType(S3InputSourceBuilder.class, 
S3StorageDruidModule.SCHEME)
+            new NamedType(S3InputSourceFactory.class, 
S3StorageDruidModule.SCHEME)
         )
     );
   }
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceFactory.java
similarity index 94%
rename from 
extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java
rename to 
extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceFactory.java
index 1755cb041c..d8df28ca6b 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceFactory.java
@@ -27,7 +27,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.common.aws.AWSClientConfig;
 import org.apache.druid.common.aws.AWSEndpointConfig;
 import org.apache.druid.common.aws.AWSProxyConfig;
-import org.apache.druid.data.input.AbstractInputSourceBuilder;
+import org.apache.druid.data.input.InputSourceFactory;
 import org.apache.druid.data.input.impl.SplittableInputSource;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.storage.s3.S3InputDataConfig;
@@ -39,7 +39,7 @@ import java.net.URISyntaxException;
 import java.util.List;
 import java.util.stream.Collectors;
 
-public class S3InputSourceBuilder extends AbstractInputSourceBuilder
+public class S3InputSourceFactory implements InputSourceFactory
 {
   private final ServerSideEncryptingAmazonS3 s3Client;
   private final ServerSideEncryptingAmazonS3.Builder s3ClientBuilder;
@@ -51,7 +51,7 @@ public class S3InputSourceBuilder extends 
AbstractInputSourceBuilder
   private final AWSEndpointConfig awsEndpointConfig;
 
   @JsonCreator
-  public S3InputSourceBuilder(
+  public S3InputSourceFactory(
       @JacksonInject ServerSideEncryptingAmazonS3 s3Client,
       @JacksonInject ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
       @JacksonInject S3InputDataConfig inputDataConfig,
@@ -73,7 +73,7 @@ public class S3InputSourceBuilder extends 
AbstractInputSourceBuilder
   }
 
   @Override
-  public SplittableInputSource generateInputSource(List<String> inputFilePaths)
+  public SplittableInputSource create(List<String> inputFilePaths)
   {
     return new S3InputSource(
         s3Client,
diff --git 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java
 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceFactoryTest.java
similarity index 90%
rename from 
extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java
rename to 
extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceFactoryTest.java
index bb42975cd6..2304b0f8bf 100644
--- 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java
+++ 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceFactoryTest.java
@@ -28,7 +28,7 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.List;
 
-public class S3InputSourceBuilderTest
+public class S3InputSourceFactoryTest
 {
   @Test
   public void testAdapterGet()
@@ -43,7 +43,7 @@ public class S3InputSourceBuilderTest
         "s3://bar/foo/file3.txt"
     );
 
-    S3InputSourceBuilder s3Builder = new S3InputSourceBuilder(
+    S3InputSourceFactory s3Builder = new S3InputSourceFactory(
         service,
         serverSides3Builder,
         dataConfig,
@@ -53,6 +53,6 @@ public class S3InputSourceBuilderTest
         null,
         null
     );
-    Assert.assertTrue(s3Builder.generateInputSource(fileUris) instanceof 
S3InputSource);
+    Assert.assertTrue(s3Builder.create(fileUris) instanceof S3InputSource);
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java
 
b/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java
deleted file mode 100644
index a0af825c1a..0000000000
--- 
a/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.data.input;
-
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.druid.data.input.impl.LocalInputSourceBuilder;
-import org.apache.druid.data.input.impl.SplittableInputSource;
-import org.apache.druid.java.util.common.CloseableIterators;
-import org.apache.druid.java.util.common.parsers.CloseableIterator;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Stream;
-
-/**
- * A wrapper on top of {@link SplittableInputSource} that handles input source 
creation.
- * For composing input sources such as IcebergInputSource, the delegate input 
source instantiation might fail upon deserialization since the input file paths
- * are not available yet and this might fail the input source precondition 
checks.
- * This adapter helps create the delegate input source once the input file 
paths are fully determined.
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
-@JsonSubTypes(value = {
-    @JsonSubTypes.Type(name = LocalInputSourceBuilder.TYPE_KEY, value = 
LocalInputSourceBuilder.class)
-})
-public abstract class AbstractInputSourceBuilder
-{
-  public abstract SplittableInputSource generateInputSource(List<String> 
inputFilePaths);
-
-  public SplittableInputSource setupInputSource(List<String> inputFilePaths)
-  {
-    if (inputFilePaths.isEmpty()) {
-      return new EmptyInputSource();
-    } else {
-      return generateInputSource(inputFilePaths);
-    }
-  }
-
-  /**
-   * This input source is used in place of a delegate input source if there 
are no input file paths.
-   * Certain input sources cannot be instantiated with an empty input file 
list and so composing input sources such as IcebergInputSource
-   * may use this input source as delegate in such cases.
-   */
-  private static class EmptyInputSource implements SplittableInputSource
-  {
-    @Override
-    public boolean needsFormat()
-    {
-      return false;
-    }
-
-    @Override
-    public boolean isSplittable()
-    {
-      return false;
-    }
-
-    @Override
-    public InputSourceReader reader(
-        InputRowSchema inputRowSchema,
-        @Nullable InputFormat inputFormat,
-        File temporaryDirectory
-    )
-    {
-      return new InputSourceReader()
-      {
-        @Override
-        public CloseableIterator<InputRow> read(InputStats inputStats)
-        {
-          return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
-          });
-        }
-
-        @Override
-        public CloseableIterator<InputRowListPlusRawValues> sample()
-        {
-          return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
-          });
-        }
-      };
-    }
-
-    @Override
-    public Stream<InputSplit> createSplits(
-        InputFormat inputFormat,
-        @Nullable SplitHintSpec splitHintSpec
-    )
-    {
-      return Stream.empty();
-    }
-
-    @Override
-    public int estimateNumSplits(InputFormat inputFormat, @Nullable 
SplitHintSpec splitHintSpec)
-    {
-      return 0;
-    }
-
-    @Override
-    public InputSource withSplit(InputSplit split)
-    {
-      return null;
-    }
-  }
-}
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/InputSourceFactory.java 
b/processing/src/main/java/org/apache/druid/data/input/InputSourceFactory.java
new file mode 100644
index 0000000000..4b298695cd
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/data/input/InputSourceFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.data.input.impl.LocalInputSourceFactory;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.guice.annotations.UnstableApi;
+
+import java.util.List;
+
+/**
+ * An interface to generate a {@link SplittableInputSource} objects on the fly.
+ * For composing input sources such as IcebergInputSource, the delegate input 
source instantiation might fail upon deserialization since the input file paths
+ * are not available yet and this might fail the input source precondition 
checks.
+ * This factory helps create the delegate input source once the input file 
paths are fully determined.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(name = "local", value = LocalInputSourceFactory.class)
+})
+@UnstableApi
+public interface InputSourceFactory
+{
+  SplittableInputSource create(List<String> inputFilePaths);
+}
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java
 
b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceFactory.java
similarity index 81%
rename from 
processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java
rename to 
processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceFactory.java
index 6e2f44b567..b2fa6a13dc 100644
--- 
a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java
+++ 
b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceFactory.java
@@ -19,18 +19,17 @@
 
 package org.apache.druid.data.input.impl;
 
-import org.apache.druid.data.input.AbstractInputSourceBuilder;
+import org.apache.druid.data.input.InputSourceFactory;
 
 import java.io.File;
 import java.util.List;
 import java.util.stream.Collectors;
 
-public class LocalInputSourceBuilder extends AbstractInputSourceBuilder
+public class LocalInputSourceFactory implements InputSourceFactory
 {
-  public static final String TYPE_KEY = "local";
 
   @Override
-  public LocalInputSource generateInputSource(List<String> inputFilePaths)
+  public LocalInputSource create(List<String> inputFilePaths)
   {
     return new LocalInputSource(
         null,
diff --git 
a/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceAdapterTest.java
 
b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceAdapterTest.java
index 78a33c05b4..38ba640e49 100644
--- 
a/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceAdapterTest.java
+++ 
b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceAdapterTest.java
@@ -19,22 +19,12 @@
 
 package org.apache.druid.data.input.impl;
 
-import org.apache.druid.data.input.InputFormat;
-import org.apache.druid.data.input.InputRowSchema;
-import org.apache.druid.data.input.InputSource;
-import org.apache.druid.data.input.InputSplit;
-import org.apache.druid.data.input.SplitHintSpec;
-import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
 
 public class LocalInputSourceAdapterTest
 {
@@ -44,44 +34,10 @@ public class LocalInputSourceAdapterTest
   @Test
   public void testAdapterGet()
   {
-    LocalInputSourceBuilder localInputSourceAdapter = new 
LocalInputSourceBuilder();
-    
Assert.assertTrue(localInputSourceAdapter.generateInputSource(Arrays.asList(
+    LocalInputSourceFactory localInputSourceAdapter = new 
LocalInputSourceFactory();
+    Assert.assertTrue(localInputSourceAdapter.create(Arrays.asList(
         "foo.parquet",
         "bar.parquet"
     )) instanceof LocalInputSource);
   }
-
-  @Test
-  public void testAdapterSetup()
-  {
-    LocalInputSourceBuilder localInputSourceAdapter = new 
LocalInputSourceBuilder();
-    InputSource delegateInputSource = 
localInputSourceAdapter.setupInputSource(Arrays.asList(
-        "foo.parquet",
-        "bar.parquet"
-    ));
-    Assert.assertTrue(delegateInputSource instanceof LocalInputSource);
-  }
-
-  @Test
-  public void testEmptyInputSource() throws IOException
-  {
-    InputFormat mockFormat = EasyMock.createMock(InputFormat.class);
-    SplitHintSpec mockSplitHint = EasyMock.createMock(SplitHintSpec.class);
-    LocalInputSourceBuilder localInputSourceAdapter = new 
LocalInputSourceBuilder();
-    SplittableInputSource<Object> emptyInputSource =
-        (SplittableInputSource<Object>) 
localInputSourceAdapter.setupInputSource(Collections.emptyList());
-    List<InputSplit<Object>> splitList = emptyInputSource
-        .createSplits(mockFormat, mockSplitHint)
-        .collect(Collectors.toList());
-    Assert.assertTrue(splitList.isEmpty());
-    Assert.assertFalse(emptyInputSource.isSplittable());
-    Assert.assertFalse(emptyInputSource.needsFormat());
-    
Assert.assertNull(emptyInputSource.withSplit(EasyMock.createMock(InputSplit.class)));
-    Assert.assertEquals(0, emptyInputSource.estimateNumSplits(mockFormat, 
mockSplitHint));
-    Assert.assertFalse(emptyInputSource.reader(
-        EasyMock.createMock(InputRowSchema.class),
-        mockFormat,
-        temporaryFolder.newFolder()
-    ).read().hasNext());
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to