This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch 27.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/27.0.0 by this push:
new c79848de1f [Backport] Core changes for iceberg ingest extension
(#14608)
c79848de1f is described below
commit c79848de1fb2fbdba83c05d1a23689f4c75a2ec6
Author: Atul Mohan <[email protected]>
AuthorDate: Tue Jul 18 20:35:16 2023 -0700
[Backport] Core changes for iceberg ingest extension (#14608)
* Backport core iceberg changes
* Add local input source builder
---
.../inputsource/hdfs/HdfsInputSourceBuilder.java | 51 ++++++++
.../druid/storage/hdfs/HdfsStorageDruidModule.java | 4 +-
.../hdfs/HdfsInputSourceBuilderTest.java} | 36 ++----
.../druid/data/input/s3/S3InputSourceBuilder.java | 134 +++++++++++++++++++++
.../data/input/s3/S3InputSourceDruidModule.java | 3 +-
.../data/input/s3/S3InputSourceBuilderTest.java | 58 +++++++++
.../data/input/AbstractInputSourceBuilder.java | 123 +++++++++++++++++++
.../data/input/impl/LocalInputSourceBuilder.java | 34 ++----
.../input/impl/LocalInputSourceBuilderTest.java | 87 +++++++++++++
9 files changed, 483 insertions(+), 47 deletions(-)
diff --git
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java
new file mode 100644
index 0000000000..94b67ca93d
--- /dev/null
+++
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.inputsource.hdfs;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.druid.data.input.AbstractInputSourceBuilder;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.guice.Hdfs;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.List;
+
+public class HdfsInputSourceBuilder extends AbstractInputSourceBuilder
+{
+ private final Configuration configuration;
+ private final HdfsInputSourceConfig inputSourceConfig;
+
+ @JsonCreator
+ public HdfsInputSourceBuilder(
+ @JacksonInject @Hdfs Configuration configuration,
+ @JacksonInject HdfsInputSourceConfig inputSourceConfig
+ )
+ {
+ this.configuration = configuration;
+ this.inputSourceConfig = inputSourceConfig;
+ }
+
+ @Override
+ public SplittableInputSource generateInputSource(List<String> inputFilePaths)
+ {
+ return new HdfsInputSource(inputFilePaths, configuration,
inputSourceConfig);
+ }
+}
diff --git
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
index 6a5f96b0d5..0923450c5f 100644
---
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
+++
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
@@ -34,6 +34,7 @@ import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.inputsource.hdfs.HdfsInputSource;
+import org.apache.druid.inputsource.hdfs.HdfsInputSourceBuilder;
import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs;
import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
@@ -65,7 +66,8 @@ public class HdfsStorageDruidModule implements DruidModule
return Collections.singletonList(
new SimpleModule().registerSubtypes(
new NamedType(HdfsLoadSpec.class, HdfsStorageDruidModule.SCHEME),
- new NamedType(HdfsInputSource.class, HdfsStorageDruidModule.SCHEME)
+ new NamedType(HdfsInputSource.class,
HdfsStorageDruidModule.SCHEME),
+ new NamedType(HdfsInputSourceBuilder.class,
HdfsStorageDruidModule.SCHEME)
)
);
}
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilderTest.java
similarity index 50%
copy from
extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
copy to
extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilderTest.java
index 241c7c1640..18414a6690 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
+++
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilderTest.java
@@ -17,34 +17,22 @@
* under the License.
*/
-package org.apache.druid.data.input.s3;
+package org.apache.druid.inputsource.hdfs;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Binder;
-import org.apache.druid.initialization.DruidModule;
-import org.apache.druid.storage.s3.S3StorageDruidModule;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
-import java.util.List;
+import java.util.Arrays;
-/**
- * Druid module to wire up native batch support for S3 input
- */
-public class S3InputSourceDruidModule implements DruidModule
+public class HdfsInputSourceBuilderTest
{
- @Override
- public List<? extends Module> getJacksonModules()
+ @Test
+ public void testAdapterGet()
{
- return ImmutableList.of(
- new SimpleModule().registerSubtypes(new NamedType(S3InputSource.class,
S3StorageDruidModule.SCHEME))
- );
- }
-
- @Override
- public void configure(Binder binder)
- {
-
+ Configuration conf = new Configuration();
+ HdfsInputSourceConfig inputSourceConfig = new HdfsInputSourceConfig(null);
+ HdfsInputSourceBuilder hdfsInputSourceAdapter = new
HdfsInputSourceBuilder(conf, inputSourceConfig);
+
Assert.assertTrue(hdfsInputSourceAdapter.generateInputSource(Arrays.asList("hdfs://localhost:7020/bar/def.parquet",
"hdfs://localhost:7020/bar/abc.parquet")) instanceof HdfsInputSource);
}
}
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java
new file mode 100644
index 0000000000..1755cb041c
--- /dev/null
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.s3;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.aws.AWSClientConfig;
+import org.apache.druid.common.aws.AWSEndpointConfig;
+import org.apache.druid.common.aws.AWSProxyConfig;
+import org.apache.druid.data.input.AbstractInputSourceBuilder;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.storage.s3.S3InputDataConfig;
+import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
+
+import javax.annotation.Nullable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class S3InputSourceBuilder extends AbstractInputSourceBuilder
+{
+ private final ServerSideEncryptingAmazonS3 s3Client;
+ private final ServerSideEncryptingAmazonS3.Builder s3ClientBuilder;
+ private final S3InputSourceConfig s3InputSourceConfig;
+ private final S3InputDataConfig inputDataConfig;
+ private final AWSCredentialsProvider awsCredentialsProvider;
+ private final AWSProxyConfig awsProxyConfig;
+ private final AWSClientConfig awsClientConfig;
+ private final AWSEndpointConfig awsEndpointConfig;
+
+ @JsonCreator
+ public S3InputSourceBuilder(
+ @JacksonInject ServerSideEncryptingAmazonS3 s3Client,
+ @JacksonInject ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
+ @JacksonInject S3InputDataConfig inputDataConfig,
+ @JacksonInject AWSCredentialsProvider awsCredentialsProvider,
+ @JsonProperty("properties") @Nullable S3InputSourceConfig
s3InputSourceConfig,
+ @JsonProperty("proxyConfig") @Nullable AWSProxyConfig awsProxyConfig,
+ @JsonProperty("endpointConfig") @Nullable AWSEndpointConfig
awsEndpointConfig,
+ @JsonProperty("clientConfig") @Nullable AWSClientConfig awsClientConfig
+ )
+ {
+ this.s3Client = s3Client;
+ this.s3ClientBuilder = s3ClientBuilder;
+ this.inputDataConfig = inputDataConfig;
+ this.awsCredentialsProvider = awsCredentialsProvider;
+ this.s3InputSourceConfig = s3InputSourceConfig;
+ this.awsProxyConfig = awsProxyConfig;
+ this.awsEndpointConfig = awsEndpointConfig;
+ this.awsClientConfig = awsClientConfig;
+ }
+
+ @Override
+ public SplittableInputSource generateInputSource(List<String> inputFilePaths)
+ {
+ return new S3InputSource(
+ s3Client,
+ s3ClientBuilder,
+ inputDataConfig,
+ awsCredentialsProvider,
+ inputFilePaths.stream().map(chosenPath -> {
+ try {
+ return new URI(StringUtils.replace(chosenPath, "s3a://", "s3://"));
+ }
+ catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(
+ Collectors.toList()),
+ null,
+ null,
+ null,
+ s3InputSourceConfig,
+ awsProxyConfig,
+ awsEndpointConfig,
+ awsClientConfig
+ );
+ }
+
+ @Nullable
+ @JsonProperty("properties")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public S3InputSourceConfig getS3InputSourceConfig()
+ {
+ return s3InputSourceConfig;
+ }
+
+ @Nullable
+ @JsonProperty("proxyConfig")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public AWSProxyConfig getAwsProxyConfig()
+ {
+ return awsProxyConfig;
+ }
+
+ @Nullable
+ @JsonProperty("clientConfig")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public AWSClientConfig getAwsClientConfig()
+ {
+ return awsClientConfig;
+ }
+
+ @Nullable
+ @JsonProperty("endpointConfig")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public AWSEndpointConfig getAwsEndpointConfig()
+ {
+ return awsEndpointConfig;
+ }
+
+}
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
index 241c7c1640..2d68ffecec 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
@@ -38,7 +38,8 @@ public class S3InputSourceDruidModule implements DruidModule
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
- new SimpleModule().registerSubtypes(new NamedType(S3InputSource.class,
S3StorageDruidModule.SCHEME))
+ new SimpleModule().registerSubtypes(new NamedType(S3InputSource.class,
S3StorageDruidModule.SCHEME),
+ new
NamedType(S3InputSourceBuilder.class, S3StorageDruidModule.SCHEME))
);
}
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java
new file mode 100644
index 0000000000..bb42975cd6
--- /dev/null
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.s3;
+
+import org.apache.druid.storage.s3.S3InputDataConfig;
+import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class S3InputSourceBuilderTest
+{
+ @Test
+ public void testAdapterGet()
+ {
+ ServerSideEncryptingAmazonS3.Builder serverSides3Builder =
+ EasyMock.createMock(ServerSideEncryptingAmazonS3.Builder.class);
+ ServerSideEncryptingAmazonS3 service =
EasyMock.createMock(ServerSideEncryptingAmazonS3.class);
+ S3InputDataConfig dataConfig =
EasyMock.createMock(S3InputDataConfig.class);
+ List<String> fileUris = Arrays.asList(
+ "s3://foo/bar/file.csv",
+ "s3://bar/foo/file2.csv",
+ "s3://bar/foo/file3.txt"
+ );
+
+ S3InputSourceBuilder s3Builder = new S3InputSourceBuilder(
+ service,
+ serverSides3Builder,
+ dataConfig,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+ Assert.assertTrue(s3Builder.generateInputSource(fileUris) instanceof
S3InputSource);
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java
b/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java
new file mode 100644
index 0000000000..a0af825c1a
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.data.input.impl.LocalInputSourceBuilder;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper on top of {@link SplittableInputSource} that handles input source
creation.
+ * For composing input sources such as IcebergInputSource, the delegate input
source instantiation might fail upon deserialization since the input file paths
+ * are not available yet and this might fail the input source precondition
checks.
+ * This adapter helps create the delegate input source once the input file
paths are fully determined.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = LocalInputSourceBuilder.TYPE_KEY, value =
LocalInputSourceBuilder.class)
+})
+public abstract class AbstractInputSourceBuilder
+{
+ public abstract SplittableInputSource generateInputSource(List<String>
inputFilePaths);
+
+ public SplittableInputSource setupInputSource(List<String> inputFilePaths)
+ {
+ if (inputFilePaths.isEmpty()) {
+ return new EmptyInputSource();
+ } else {
+ return generateInputSource(inputFilePaths);
+ }
+ }
+
+ /**
+ * This input source is used in place of a delegate input source if there
are no input file paths.
+ * Certain input sources cannot be instantiated with an empty input file
list and so composing input sources such as IcebergInputSource
+ * may use this input source as delegate in such cases.
+ */
+ private static class EmptyInputSource implements SplittableInputSource
+ {
+ @Override
+ public boolean needsFormat()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isSplittable()
+ {
+ return false;
+ }
+
+ @Override
+ public InputSourceReader reader(
+ InputRowSchema inputRowSchema,
+ @Nullable InputFormat inputFormat,
+ File temporaryDirectory
+ )
+ {
+ return new InputSourceReader()
+ {
+ @Override
+ public CloseableIterator<InputRow> read(InputStats inputStats)
+ {
+ return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
+ });
+ }
+
+ @Override
+ public CloseableIterator<InputRowListPlusRawValues> sample()
+ {
+ return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
+ });
+ }
+ };
+ }
+
+ @Override
+ public Stream<InputSplit> createSplits(
+ InputFormat inputFormat,
+ @Nullable SplitHintSpec splitHintSpec
+ )
+ {
+ return Stream.empty();
+ }
+
+ @Override
+ public int estimateNumSplits(InputFormat inputFormat, @Nullable
SplitHintSpec splitHintSpec)
+ {
+ return 0;
+ }
+
+ @Override
+ public InputSource withSplit(InputSplit split)
+ {
+ return null;
+ }
+ }
+}
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java
similarity index 52%
copy from
extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
copy to
processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java
index 241c7c1640..6e2f44b567 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
+++
b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java
@@ -17,34 +17,26 @@
* under the License.
*/
-package org.apache.druid.data.input.s3;
+package org.apache.druid.data.input.impl;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Binder;
-import org.apache.druid.initialization.DruidModule;
-import org.apache.druid.storage.s3.S3StorageDruidModule;
+import org.apache.druid.data.input.AbstractInputSourceBuilder;
+import java.io.File;
import java.util.List;
+import java.util.stream.Collectors;
-/**
- * Druid module to wire up native batch support for S3 input
- */
-public class S3InputSourceDruidModule implements DruidModule
+public class LocalInputSourceBuilder extends AbstractInputSourceBuilder
{
- @Override
- public List<? extends Module> getJacksonModules()
- {
- return ImmutableList.of(
- new SimpleModule().registerSubtypes(new NamedType(S3InputSource.class,
S3StorageDruidModule.SCHEME))
- );
- }
+ public static final String TYPE_KEY = "local";
@Override
- public void configure(Binder binder)
+ public LocalInputSource generateInputSource(List<String> inputFilePaths)
{
-
+ return new LocalInputSource(
+ null,
+ null,
+ inputFilePaths.stream().map(chosenPath -> new
File(chosenPath)).collect(
+ Collectors.toList())
+ );
}
}
diff --git
a/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceBuilderTest.java
b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceBuilderTest.java
new file mode 100644
index 0000000000..b2d0973612
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceBuilderTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.SplitHintSpec;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class LocalInputSourceBuilderTest
+{
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Test
+ public void testAdapterGet()
+ {
+ LocalInputSourceBuilder localInputSourceAdapter = new
LocalInputSourceBuilder();
+
Assert.assertTrue(localInputSourceAdapter.generateInputSource(Arrays.asList(
+ "foo.parquet",
+ "bar.parquet"
+ )) instanceof LocalInputSource);
+ }
+
+ @Test
+ public void testAdapterSetup()
+ {
+ LocalInputSourceBuilder localInputSourceAdapter = new
LocalInputSourceBuilder();
+ InputSource delegateInputSource =
localInputSourceAdapter.setupInputSource(Arrays.asList(
+ "foo.parquet",
+ "bar.parquet"
+ ));
+ Assert.assertTrue(delegateInputSource instanceof LocalInputSource);
+ }
+
+ @Test
+ public void testEmptyInputSource() throws IOException
+ {
+ InputFormat mockFormat = EasyMock.createMock(InputFormat.class);
+ SplitHintSpec mockSplitHint = EasyMock.createMock(SplitHintSpec.class);
+ LocalInputSourceBuilder localInputSourceAdapter = new
LocalInputSourceBuilder();
+ SplittableInputSource<Object> emptyInputSource =
+ (SplittableInputSource<Object>)
localInputSourceAdapter.setupInputSource(Collections.emptyList());
+ List<InputSplit<Object>> splitList = emptyInputSource
+ .createSplits(mockFormat, mockSplitHint)
+ .collect(Collectors.toList());
+ Assert.assertTrue(splitList.isEmpty());
+ Assert.assertFalse(emptyInputSource.isSplittable());
+ Assert.assertFalse(emptyInputSource.needsFormat());
+
Assert.assertNull(emptyInputSource.withSplit(EasyMock.createMock(InputSplit.class)));
+ Assert.assertEquals(0, emptyInputSource.estimateNumSplits(mockFormat,
mockSplitHint));
+ Assert.assertFalse(emptyInputSource.reader(
+ EasyMock.createMock(InputRowSchema.class),
+ mockFormat,
+ temporaryFolder.newFolder()
+ ).read().hasNext());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]