This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch 27.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/27.0.0 by this push:
     new c79848de1f [Backport] Core changes for iceberg ingest extension 
(#14608)
c79848de1f is described below

commit c79848de1fb2fbdba83c05d1a23689f4c75a2ec6
Author: Atul Mohan <[email protected]>
AuthorDate: Tue Jul 18 20:35:16 2023 -0700

    [Backport] Core changes for iceberg ingest extension (#14608)
    
    * Backport core iceberg changes
    
    * Add local input source builder
---
 .../inputsource/hdfs/HdfsInputSourceBuilder.java   |  51 ++++++++
 .../druid/storage/hdfs/HdfsStorageDruidModule.java |   4 +-
 .../hdfs/HdfsInputSourceBuilderTest.java}          |  36 ++----
 .../druid/data/input/s3/S3InputSourceBuilder.java  | 134 +++++++++++++++++++++
 .../data/input/s3/S3InputSourceDruidModule.java    |   3 +-
 .../data/input/s3/S3InputSourceBuilderTest.java    |  58 +++++++++
 .../data/input/AbstractInputSourceBuilder.java     | 123 +++++++++++++++++++
 .../data/input/impl/LocalInputSourceBuilder.java   |  34 ++----
 .../input/impl/LocalInputSourceBuilderTest.java    |  87 +++++++++++++
 9 files changed, 483 insertions(+), 47 deletions(-)

diff --git 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java
 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java
new file mode 100644
index 0000000000..94b67ca93d
--- /dev/null
+++ 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.inputsource.hdfs;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.druid.data.input.AbstractInputSourceBuilder;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.guice.Hdfs;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.List;
+
+public class HdfsInputSourceBuilder extends AbstractInputSourceBuilder
+{
+  private final Configuration configuration;
+  private final HdfsInputSourceConfig inputSourceConfig;
+
+  @JsonCreator
+  public HdfsInputSourceBuilder(
+      @JacksonInject @Hdfs Configuration configuration,
+      @JacksonInject HdfsInputSourceConfig inputSourceConfig
+  )
+  {
+    this.configuration = configuration;
+    this.inputSourceConfig = inputSourceConfig;
+  }
+
+  @Override
+  public SplittableInputSource generateInputSource(List<String> inputFilePaths)
+  {
+    return new HdfsInputSource(inputFilePaths, configuration, 
inputSourceConfig);
+  }
+}
diff --git 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
index 6a5f96b0d5..0923450c5f 100644
--- 
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
+++ 
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
@@ -34,6 +34,7 @@ import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.inputsource.hdfs.HdfsInputSource;
+import org.apache.druid.inputsource.hdfs.HdfsInputSourceBuilder;
 import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
 import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs;
 import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
@@ -65,7 +66,8 @@ public class HdfsStorageDruidModule implements DruidModule
     return Collections.singletonList(
         new SimpleModule().registerSubtypes(
             new NamedType(HdfsLoadSpec.class, HdfsStorageDruidModule.SCHEME),
-            new NamedType(HdfsInputSource.class, HdfsStorageDruidModule.SCHEME)
+            new NamedType(HdfsInputSource.class, 
HdfsStorageDruidModule.SCHEME),
+            new NamedType(HdfsInputSourceBuilder.class, 
HdfsStorageDruidModule.SCHEME)
         )
     );
   }
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
 
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilderTest.java
similarity index 50%
copy from 
extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
copy to 
extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilderTest.java
index 241c7c1640..18414a6690 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
+++ 
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilderTest.java
@@ -17,34 +17,22 @@
  * under the License.
  */
 
-package org.apache.druid.data.input.s3;
+package org.apache.druid.inputsource.hdfs;
 
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Binder;
-import org.apache.druid.initialization.DruidModule;
-import org.apache.druid.storage.s3.S3StorageDruidModule;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
 
-import java.util.List;
+import java.util.Arrays;
 
-/**
- * Druid module to wire up native batch support for S3 input
- */
-public class S3InputSourceDruidModule implements DruidModule
+public class HdfsInputSourceBuilderTest
 {
-  @Override
-  public List<? extends Module> getJacksonModules()
+  @Test
+  public void testAdapterGet()
   {
-    return ImmutableList.of(
-        new SimpleModule().registerSubtypes(new NamedType(S3InputSource.class, 
S3StorageDruidModule.SCHEME))
-    );
-  }
-
-  @Override
-  public void configure(Binder binder)
-  {
-
+    Configuration conf = new Configuration();
+    HdfsInputSourceConfig inputSourceConfig = new HdfsInputSourceConfig(null);
+    HdfsInputSourceBuilder hdfsInputSourceAdapter = new 
HdfsInputSourceBuilder(conf, inputSourceConfig);
+    
Assert.assertTrue(hdfsInputSourceAdapter.generateInputSource(Arrays.asList("hdfs://localhost:7020/bar/def.parquet",
 "hdfs://localhost:7020/bar/abc.parquet")) instanceof HdfsInputSource);
   }
 }
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java
new file mode 100644
index 0000000000..1755cb041c
--- /dev/null
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.s3;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.aws.AWSClientConfig;
+import org.apache.druid.common.aws.AWSEndpointConfig;
+import org.apache.druid.common.aws.AWSProxyConfig;
+import org.apache.druid.data.input.AbstractInputSourceBuilder;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.storage.s3.S3InputDataConfig;
+import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
+
+import javax.annotation.Nullable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class S3InputSourceBuilder extends AbstractInputSourceBuilder
+{
+  private final ServerSideEncryptingAmazonS3 s3Client;
+  private final ServerSideEncryptingAmazonS3.Builder s3ClientBuilder;
+  private final S3InputSourceConfig s3InputSourceConfig;
+  private final S3InputDataConfig inputDataConfig;
+  private final AWSCredentialsProvider awsCredentialsProvider;
+  private final AWSProxyConfig awsProxyConfig;
+  private final AWSClientConfig awsClientConfig;
+  private final AWSEndpointConfig awsEndpointConfig;
+
+  @JsonCreator
+  public S3InputSourceBuilder(
+      @JacksonInject ServerSideEncryptingAmazonS3 s3Client,
+      @JacksonInject ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
+      @JacksonInject S3InputDataConfig inputDataConfig,
+      @JacksonInject AWSCredentialsProvider awsCredentialsProvider,
+      @JsonProperty("properties") @Nullable S3InputSourceConfig 
s3InputSourceConfig,
+      @JsonProperty("proxyConfig") @Nullable AWSProxyConfig awsProxyConfig,
+      @JsonProperty("endpointConfig") @Nullable AWSEndpointConfig 
awsEndpointConfig,
+      @JsonProperty("clientConfig") @Nullable AWSClientConfig awsClientConfig
+  )
+  {
+    this.s3Client = s3Client;
+    this.s3ClientBuilder = s3ClientBuilder;
+    this.inputDataConfig = inputDataConfig;
+    this.awsCredentialsProvider = awsCredentialsProvider;
+    this.s3InputSourceConfig = s3InputSourceConfig;
+    this.awsProxyConfig = awsProxyConfig;
+    this.awsEndpointConfig = awsEndpointConfig;
+    this.awsClientConfig = awsClientConfig;
+  }
+
+  @Override
+  public SplittableInputSource generateInputSource(List<String> inputFilePaths)
+  {
+    return new S3InputSource(
+        s3Client,
+        s3ClientBuilder,
+        inputDataConfig,
+        awsCredentialsProvider,
+        inputFilePaths.stream().map(chosenPath -> {
+          try {
+            return new URI(StringUtils.replace(chosenPath, "s3a://", "s3://"));
+          }
+          catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+          }
+        }).collect(
+            Collectors.toList()),
+        null,
+        null,
+        null,
+        s3InputSourceConfig,
+        awsProxyConfig,
+        awsEndpointConfig,
+        awsClientConfig
+    );
+  }
+
+  @Nullable
+  @JsonProperty("properties")
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public S3InputSourceConfig getS3InputSourceConfig()
+  {
+    return s3InputSourceConfig;
+  }
+
+  @Nullable
+  @JsonProperty("proxyConfig")
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public AWSProxyConfig getAwsProxyConfig()
+  {
+    return awsProxyConfig;
+  }
+
+  @Nullable
+  @JsonProperty("clientConfig")
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public AWSClientConfig getAwsClientConfig()
+  {
+    return awsClientConfig;
+  }
+
+  @Nullable
+  @JsonProperty("endpointConfig")
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public AWSEndpointConfig getAwsEndpointConfig()
+  {
+    return awsEndpointConfig;
+  }
+
+}
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
index 241c7c1640..2d68ffecec 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
@@ -38,7 +38,8 @@ public class S3InputSourceDruidModule implements DruidModule
   public List<? extends Module> getJacksonModules()
   {
     return ImmutableList.of(
-        new SimpleModule().registerSubtypes(new NamedType(S3InputSource.class, 
S3StorageDruidModule.SCHEME))
+        new SimpleModule().registerSubtypes(new NamedType(S3InputSource.class, 
S3StorageDruidModule.SCHEME),
+                                            new 
NamedType(S3InputSourceBuilder.class, S3StorageDruidModule.SCHEME))
     );
   }
 
diff --git 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java
 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java
new file mode 100644
index 0000000000..bb42975cd6
--- /dev/null
+++ 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.s3;
+
+import org.apache.druid.storage.s3.S3InputDataConfig;
+import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class S3InputSourceBuilderTest
+{
+  @Test
+  public void testAdapterGet()
+  {
+    ServerSideEncryptingAmazonS3.Builder serverSides3Builder =
+        EasyMock.createMock(ServerSideEncryptingAmazonS3.Builder.class);
+    ServerSideEncryptingAmazonS3 service = 
EasyMock.createMock(ServerSideEncryptingAmazonS3.class);
+    S3InputDataConfig dataConfig = 
EasyMock.createMock(S3InputDataConfig.class);
+    List<String> fileUris = Arrays.asList(
+        "s3://foo/bar/file.csv",
+        "s3://bar/foo/file2.csv",
+        "s3://bar/foo/file3.txt"
+    );
+
+    S3InputSourceBuilder s3Builder = new S3InputSourceBuilder(
+        service,
+        serverSides3Builder,
+        dataConfig,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+    Assert.assertTrue(s3Builder.generateInputSource(fileUris) instanceof 
S3InputSource);
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java
 
b/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java
new file mode 100644
index 0000000000..a0af825c1a
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.data.input.impl.LocalInputSourceBuilder;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper on top of {@link SplittableInputSource} that handles input source 
creation.
+ * For composing input sources such as IcebergInputSource, the delegate input 
source instantiation might fail upon deserialization since the input file paths
+ * are not available yet and this might fail the input source precondition 
checks.
+ * This adapter helps create the delegate input source once the input file 
paths are fully determined.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(name = LocalInputSourceBuilder.TYPE_KEY, value = 
LocalInputSourceBuilder.class)
+})
+public abstract class AbstractInputSourceBuilder
+{
+  public abstract SplittableInputSource generateInputSource(List<String> 
inputFilePaths);
+
+  public SplittableInputSource setupInputSource(List<String> inputFilePaths)
+  {
+    if (inputFilePaths.isEmpty()) {
+      return new EmptyInputSource();
+    } else {
+      return generateInputSource(inputFilePaths);
+    }
+  }
+
+  /**
+   * This input source is used in place of a delegate input source if there 
are no input file paths.
+   * Certain input sources cannot be instantiated with an empty input file 
list and so composing input sources such as IcebergInputSource
+   * may use this input source as delegate in such cases.
+   */
+  private static class EmptyInputSource implements SplittableInputSource
+  {
+    @Override
+    public boolean needsFormat()
+    {
+      return false;
+    }
+
+    @Override
+    public boolean isSplittable()
+    {
+      return false;
+    }
+
+    @Override
+    public InputSourceReader reader(
+        InputRowSchema inputRowSchema,
+        @Nullable InputFormat inputFormat,
+        File temporaryDirectory
+    )
+    {
+      return new InputSourceReader()
+      {
+        @Override
+        public CloseableIterator<InputRow> read(InputStats inputStats)
+        {
+          return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
+          });
+        }
+
+        @Override
+        public CloseableIterator<InputRowListPlusRawValues> sample()
+        {
+          return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
+          });
+        }
+      };
+    }
+
+    @Override
+    public Stream<InputSplit> createSplits(
+        InputFormat inputFormat,
+        @Nullable SplitHintSpec splitHintSpec
+    )
+    {
+      return Stream.empty();
+    }
+
+    @Override
+    public int estimateNumSplits(InputFormat inputFormat, @Nullable 
SplitHintSpec splitHintSpec)
+    {
+      return 0;
+    }
+
+    @Override
+    public InputSource withSplit(InputSplit split)
+    {
+      return null;
+    }
+  }
+}
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
 
b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java
similarity index 52%
copy from 
extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
copy to 
processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java
index 241c7c1640..6e2f44b567 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
+++ 
b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java
@@ -17,34 +17,26 @@
  * under the License.
  */
 
-package org.apache.druid.data.input.s3;
+package org.apache.druid.data.input.impl;
 
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Binder;
-import org.apache.druid.initialization.DruidModule;
-import org.apache.druid.storage.s3.S3StorageDruidModule;
+import org.apache.druid.data.input.AbstractInputSourceBuilder;
 
+import java.io.File;
 import java.util.List;
+import java.util.stream.Collectors;
 
-/**
- * Druid module to wire up native batch support for S3 input
- */
-public class S3InputSourceDruidModule implements DruidModule
+public class LocalInputSourceBuilder extends AbstractInputSourceBuilder
 {
-  @Override
-  public List<? extends Module> getJacksonModules()
-  {
-    return ImmutableList.of(
-        new SimpleModule().registerSubtypes(new NamedType(S3InputSource.class, 
S3StorageDruidModule.SCHEME))
-    );
-  }
+  public static final String TYPE_KEY = "local";
 
   @Override
-  public void configure(Binder binder)
+  public LocalInputSource generateInputSource(List<String> inputFilePaths)
   {
-
+    return new LocalInputSource(
+        null,
+        null,
+        inputFilePaths.stream().map(chosenPath -> new 
File(chosenPath)).collect(
+            Collectors.toList())
+    );
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceBuilderTest.java
 
b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceBuilderTest.java
new file mode 100644
index 0000000000..b2d0973612
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceBuilderTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.SplitHintSpec;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class LocalInputSourceBuilderTest
+{
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Test
+  public void testAdapterGet()
+  {
+    LocalInputSourceBuilder localInputSourceAdapter = new 
LocalInputSourceBuilder();
+    
Assert.assertTrue(localInputSourceAdapter.generateInputSource(Arrays.asList(
+        "foo.parquet",
+        "bar.parquet"
+    )) instanceof LocalInputSource);
+  }
+
+  @Test
+  public void testAdapterSetup()
+  {
+    LocalInputSourceBuilder localInputSourceAdapter = new 
LocalInputSourceBuilder();
+    InputSource delegateInputSource = 
localInputSourceAdapter.setupInputSource(Arrays.asList(
+        "foo.parquet",
+        "bar.parquet"
+    ));
+    Assert.assertTrue(delegateInputSource instanceof LocalInputSource);
+  }
+
+  @Test
+  public void testEmptyInputSource() throws IOException
+  {
+    InputFormat mockFormat = EasyMock.createMock(InputFormat.class);
+    SplitHintSpec mockSplitHint = EasyMock.createMock(SplitHintSpec.class);
+    LocalInputSourceBuilder localInputSourceAdapter = new 
LocalInputSourceBuilder();
+    SplittableInputSource<Object> emptyInputSource =
+        (SplittableInputSource<Object>) 
localInputSourceAdapter.setupInputSource(Collections.emptyList());
+    List<InputSplit<Object>> splitList = emptyInputSource
+        .createSplits(mockFormat, mockSplitHint)
+        .collect(Collectors.toList());
+    Assert.assertTrue(splitList.isEmpty());
+    Assert.assertFalse(emptyInputSource.isSplittable());
+    Assert.assertFalse(emptyInputSource.needsFormat());
+    
Assert.assertNull(emptyInputSource.withSplit(EasyMock.createMock(InputSplit.class)));
+    Assert.assertEquals(0, emptyInputSource.estimateNumSplits(mockFormat, 
mockSplitHint));
+    Assert.assertFalse(emptyInputSource.reader(
+        EasyMock.createMock(InputRowSchema.class),
+        mockFormat,
+        temporaryFolder.newFolder()
+    ).read().hasNext());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to