This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch 27.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/27.0.0 by this push:
new dab782965b [Backport] Clean up the core API required for Iceberg
extension (#14719)
dab782965b is described below
commit dab782965b9a2ce021ee09e70fbf777abc353c74
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Tue Aug 1 17:19:10 2023 +0530
[Backport] Clean up the core API required for Iceberg extension (#14719)
* Clean up the core API required for Iceberg extension (#14614)
Changes:
- Replace `AbstractInputSourceBuilder` with `InputSourceFactory`
- Move iceberg specific logic to `IcebergInputSource`
* Resolve conflict
* Use generic class in tests
---------
Co-authored-by: Abhishek Agarwal
<[email protected]>
---
...rceBuilder.java => HdfsInputSourceFactory.java} | 8 +-
.../druid/storage/hdfs/HdfsStorageDruidModule.java | 4 +-
.../hdfs/HdfsInputSourceBuilderTest.java | 5 +-
.../data/input/s3/S3InputSourceDruidModule.java | 6 +-
...ourceBuilder.java => S3InputSourceFactory.java} | 8 +-
...lderTest.java => S3InputSourceFactoryTest.java} | 7 +-
.../data/input/AbstractInputSourceBuilder.java | 123 ---------------------
.../druid/data/input/InputSourceFactory.java | 44 ++++++++
...ceBuilder.java => LocalInputSourceFactory.java} | 7 +-
.../input/impl/LocalInputSourceBuilderTest.java | 49 +-------
10 files changed, 71 insertions(+), 190 deletions(-)
diff --git
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceFactory.java
similarity index 86%
rename from
extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java
rename to
extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceFactory.java
index 94b67ca93d..d2b9da3e34 100644
---
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilder.java
+++
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceFactory.java
@@ -21,20 +21,20 @@ package org.apache.druid.inputsource.hdfs;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.druid.data.input.AbstractInputSourceBuilder;
+import org.apache.druid.data.input.InputSourceFactory;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.Hdfs;
import org.apache.hadoop.conf.Configuration;
import java.util.List;
-public class HdfsInputSourceBuilder extends AbstractInputSourceBuilder
+public class HdfsInputSourceFactory implements InputSourceFactory
{
private final Configuration configuration;
private final HdfsInputSourceConfig inputSourceConfig;
@JsonCreator
- public HdfsInputSourceBuilder(
+ public HdfsInputSourceFactory(
@JacksonInject @Hdfs Configuration configuration,
@JacksonInject HdfsInputSourceConfig inputSourceConfig
)
@@ -44,7 +44,7 @@ public class HdfsInputSourceBuilder extends
AbstractInputSourceBuilder
}
@Override
- public SplittableInputSource generateInputSource(List<String> inputFilePaths)
+ public SplittableInputSource create(List<String> inputFilePaths)
{
return new HdfsInputSource(inputFilePaths, configuration,
inputSourceConfig);
}
diff --git
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
index 0923450c5f..e2c79785fe 100644
---
a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
+++
b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java
@@ -34,8 +34,8 @@ import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.inputsource.hdfs.HdfsInputSource;
-import org.apache.druid.inputsource.hdfs.HdfsInputSourceBuilder;
import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig;
+import org.apache.druid.inputsource.hdfs.HdfsInputSourceFactory;
import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs;
import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
import org.apache.hadoop.conf.Configuration;
@@ -67,7 +67,7 @@ public class HdfsStorageDruidModule implements DruidModule
new SimpleModule().registerSubtypes(
new NamedType(HdfsLoadSpec.class, HdfsStorageDruidModule.SCHEME),
new NamedType(HdfsInputSource.class,
HdfsStorageDruidModule.SCHEME),
- new NamedType(HdfsInputSourceBuilder.class,
HdfsStorageDruidModule.SCHEME)
+ new NamedType(HdfsInputSourceFactory.class,
HdfsStorageDruidModule.SCHEME)
)
);
}
diff --git
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilderTest.java
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilderTest.java
index 18414a6690..a5fd8d6873 100644
---
a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilderTest.java
+++
b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceBuilderTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.inputsource.hdfs;
+import org.apache.druid.data.input.InputSourceFactory;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
@@ -32,7 +33,7 @@ public class HdfsInputSourceBuilderTest
{
Configuration conf = new Configuration();
HdfsInputSourceConfig inputSourceConfig = new HdfsInputSourceConfig(null);
- HdfsInputSourceBuilder hdfsInputSourceAdapter = new
HdfsInputSourceBuilder(conf, inputSourceConfig);
-
Assert.assertTrue(hdfsInputSourceAdapter.generateInputSource(Arrays.asList("hdfs://localhost:7020/bar/def.parquet",
"hdfs://localhost:7020/bar/abc.parquet")) instanceof HdfsInputSource);
+ InputSourceFactory hdfsInputSourceAdapter = new
HdfsInputSourceFactory(conf, inputSourceConfig);
+
Assert.assertTrue(hdfsInputSourceAdapter.create(Arrays.asList("hdfs://localhost:7020/bar/def.parquet",
"hdfs://localhost:7020/bar/abc.parquet")) instanceof HdfsInputSource);
}
}
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
index 2d68ffecec..7727c35115 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java
@@ -38,8 +38,10 @@ public class S3InputSourceDruidModule implements DruidModule
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
- new SimpleModule().registerSubtypes(new NamedType(S3InputSource.class,
S3StorageDruidModule.SCHEME),
- new
NamedType(S3InputSourceBuilder.class, S3StorageDruidModule.SCHEME))
+ new SimpleModule().registerSubtypes(
+ new NamedType(S3InputSource.class, S3StorageDruidModule.SCHEME),
+ new NamedType(S3InputSourceFactory.class,
S3StorageDruidModule.SCHEME)
+ )
);
}
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceFactory.java
similarity index 94%
rename from
extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java
rename to
extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceFactory.java
index 1755cb041c..d8df28ca6b 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceBuilder.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceFactory.java
@@ -27,7 +27,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.aws.AWSClientConfig;
import org.apache.druid.common.aws.AWSEndpointConfig;
import org.apache.druid.common.aws.AWSProxyConfig;
-import org.apache.druid.data.input.AbstractInputSourceBuilder;
+import org.apache.druid.data.input.InputSourceFactory;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.s3.S3InputDataConfig;
@@ -39,7 +39,7 @@ import java.net.URISyntaxException;
import java.util.List;
import java.util.stream.Collectors;
-public class S3InputSourceBuilder extends AbstractInputSourceBuilder
+public class S3InputSourceFactory implements InputSourceFactory
{
private final ServerSideEncryptingAmazonS3 s3Client;
private final ServerSideEncryptingAmazonS3.Builder s3ClientBuilder;
@@ -51,7 +51,7 @@ public class S3InputSourceBuilder extends
AbstractInputSourceBuilder
private final AWSEndpointConfig awsEndpointConfig;
@JsonCreator
- public S3InputSourceBuilder(
+ public S3InputSourceFactory(
@JacksonInject ServerSideEncryptingAmazonS3 s3Client,
@JacksonInject ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
@JacksonInject S3InputDataConfig inputDataConfig,
@@ -73,7 +73,7 @@ public class S3InputSourceBuilder extends
AbstractInputSourceBuilder
}
@Override
- public SplittableInputSource generateInputSource(List<String> inputFilePaths)
+ public SplittableInputSource create(List<String> inputFilePaths)
{
return new S3InputSource(
s3Client,
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceFactoryTest.java
similarity index 88%
rename from
extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java
rename to
extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceFactoryTest.java
index bb42975cd6..4247a20f0a 100644
---
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceBuilderTest.java
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceFactoryTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.data.input.s3;
+import org.apache.druid.data.input.InputSourceFactory;
import org.apache.druid.storage.s3.S3InputDataConfig;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.easymock.EasyMock;
@@ -28,7 +29,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.List;
-public class S3InputSourceBuilderTest
+public class S3InputSourceFactoryTest
{
@Test
public void testAdapterGet()
@@ -43,7 +44,7 @@ public class S3InputSourceBuilderTest
"s3://bar/foo/file3.txt"
);
- S3InputSourceBuilder s3Builder = new S3InputSourceBuilder(
+ InputSourceFactory s3Builder = new S3InputSourceFactory(
service,
serverSides3Builder,
dataConfig,
@@ -53,6 +54,6 @@ public class S3InputSourceBuilderTest
null,
null
);
- Assert.assertTrue(s3Builder.generateInputSource(fileUris) instanceof
S3InputSource);
+ Assert.assertTrue(s3Builder.create(fileUris) instanceof S3InputSource);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java
b/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java
deleted file mode 100644
index a0af825c1a..0000000000
---
a/processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceBuilder.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.data.input;
-
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.druid.data.input.impl.LocalInputSourceBuilder;
-import org.apache.druid.data.input.impl.SplittableInputSource;
-import org.apache.druid.java.util.common.CloseableIterators;
-import org.apache.druid.java.util.common.parsers.CloseableIterator;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Stream;
-
-/**
- * A wrapper on top of {@link SplittableInputSource} that handles input source
creation.
- * For composing input sources such as IcebergInputSource, the delegate input
source instantiation might fail upon deserialization since the input file paths
- * are not available yet and this might fail the input source precondition
checks.
- * This adapter helps create the delegate input source once the input file
paths are fully determined.
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
-@JsonSubTypes(value = {
- @JsonSubTypes.Type(name = LocalInputSourceBuilder.TYPE_KEY, value =
LocalInputSourceBuilder.class)
-})
-public abstract class AbstractInputSourceBuilder
-{
- public abstract SplittableInputSource generateInputSource(List<String>
inputFilePaths);
-
- public SplittableInputSource setupInputSource(List<String> inputFilePaths)
- {
- if (inputFilePaths.isEmpty()) {
- return new EmptyInputSource();
- } else {
- return generateInputSource(inputFilePaths);
- }
- }
-
- /**
- * This input source is used in place of a delegate input source if there
are no input file paths.
- * Certain input sources cannot be instantiated with an empty input file
list and so composing input sources such as IcebergInputSource
- * may use this input source as delegate in such cases.
- */
- private static class EmptyInputSource implements SplittableInputSource
- {
- @Override
- public boolean needsFormat()
- {
- return false;
- }
-
- @Override
- public boolean isSplittable()
- {
- return false;
- }
-
- @Override
- public InputSourceReader reader(
- InputRowSchema inputRowSchema,
- @Nullable InputFormat inputFormat,
- File temporaryDirectory
- )
- {
- return new InputSourceReader()
- {
- @Override
- public CloseableIterator<InputRow> read(InputStats inputStats)
- {
- return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
- });
- }
-
- @Override
- public CloseableIterator<InputRowListPlusRawValues> sample()
- {
- return CloseableIterators.wrap(Collections.emptyIterator(), () -> {
- });
- }
- };
- }
-
- @Override
- public Stream<InputSplit> createSplits(
- InputFormat inputFormat,
- @Nullable SplitHintSpec splitHintSpec
- )
- {
- return Stream.empty();
- }
-
- @Override
- public int estimateNumSplits(InputFormat inputFormat, @Nullable
SplitHintSpec splitHintSpec)
- {
- return 0;
- }
-
- @Override
- public InputSource withSplit(InputSplit split)
- {
- return null;
- }
- }
-}
diff --git
a/processing/src/main/java/org/apache/druid/data/input/InputSourceFactory.java
b/processing/src/main/java/org/apache/druid/data/input/InputSourceFactory.java
new file mode 100644
index 0000000000..4b298695cd
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/data/input/InputSourceFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.data.input.impl.LocalInputSourceFactory;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.guice.annotations.UnstableApi;
+
+import java.util.List;
+
+/**
+ * An interface to generate a {@link SplittableInputSource} objects on the fly.
+ * For composing input sources such as IcebergInputSource, the delegate input
source instantiation might fail upon deserialization since the input file paths
+ * are not available yet and this might fail the input source precondition
checks.
+ * This factory helps create the delegate input source once the input file
paths are fully determined.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = "local", value = LocalInputSourceFactory.class)
+})
+@UnstableApi
+public interface InputSourceFactory
+{
+ SplittableInputSource create(List<String> inputFilePaths);
+}
diff --git
a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java
b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceFactory.java
similarity index 81%
rename from
processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java
rename to
processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceFactory.java
index 6e2f44b567..b2fa6a13dc 100644
---
a/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceBuilder.java
+++
b/processing/src/main/java/org/apache/druid/data/input/impl/LocalInputSourceFactory.java
@@ -19,18 +19,17 @@
package org.apache.druid.data.input.impl;
-import org.apache.druid.data.input.AbstractInputSourceBuilder;
+import org.apache.druid.data.input.InputSourceFactory;
import java.io.File;
import java.util.List;
import java.util.stream.Collectors;
-public class LocalInputSourceBuilder extends AbstractInputSourceBuilder
+public class LocalInputSourceFactory implements InputSourceFactory
{
- public static final String TYPE_KEY = "local";
@Override
- public LocalInputSource generateInputSource(List<String> inputFilePaths)
+ public LocalInputSource create(List<String> inputFilePaths)
{
return new LocalInputSource(
null,
diff --git
a/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceBuilderTest.java
b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceBuilderTest.java
index b2d0973612..cee7c06795 100644
---
a/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceBuilderTest.java
+++
b/processing/src/test/java/org/apache/druid/data/input/impl/LocalInputSourceBuilderTest.java
@@ -19,22 +19,13 @@
package org.apache.druid.data.input.impl;
-import org.apache.druid.data.input.InputFormat;
-import org.apache.druid.data.input.InputRowSchema;
-import org.apache.druid.data.input.InputSource;
-import org.apache.druid.data.input.InputSplit;
-import org.apache.druid.data.input.SplitHintSpec;
-import org.easymock.EasyMock;
+import org.apache.druid.data.input.InputSourceFactory;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import java.io.IOException;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
public class LocalInputSourceBuilderTest
{
@@ -44,44 +35,10 @@ public class LocalInputSourceBuilderTest
@Test
public void testAdapterGet()
{
- LocalInputSourceBuilder localInputSourceAdapter = new
LocalInputSourceBuilder();
-
Assert.assertTrue(localInputSourceAdapter.generateInputSource(Arrays.asList(
+ InputSourceFactory localInputSourceAdapter = new LocalInputSourceFactory();
+ Assert.assertTrue(localInputSourceAdapter.create(Arrays.asList(
"foo.parquet",
"bar.parquet"
)) instanceof LocalInputSource);
}
-
- @Test
- public void testAdapterSetup()
- {
- LocalInputSourceBuilder localInputSourceAdapter = new
LocalInputSourceBuilder();
- InputSource delegateInputSource =
localInputSourceAdapter.setupInputSource(Arrays.asList(
- "foo.parquet",
- "bar.parquet"
- ));
- Assert.assertTrue(delegateInputSource instanceof LocalInputSource);
- }
-
- @Test
- public void testEmptyInputSource() throws IOException
- {
- InputFormat mockFormat = EasyMock.createMock(InputFormat.class);
- SplitHintSpec mockSplitHint = EasyMock.createMock(SplitHintSpec.class);
- LocalInputSourceBuilder localInputSourceAdapter = new
LocalInputSourceBuilder();
- SplittableInputSource<Object> emptyInputSource =
- (SplittableInputSource<Object>)
localInputSourceAdapter.setupInputSource(Collections.emptyList());
- List<InputSplit<Object>> splitList = emptyInputSource
- .createSplits(mockFormat, mockSplitHint)
- .collect(Collectors.toList());
- Assert.assertTrue(splitList.isEmpty());
- Assert.assertFalse(emptyInputSource.isSplittable());
- Assert.assertFalse(emptyInputSource.needsFormat());
-
Assert.assertNull(emptyInputSource.withSplit(EasyMock.createMock(InputSplit.class)));
- Assert.assertEquals(0, emptyInputSource.estimateNumSplits(mockFormat,
mockSplitHint));
- Assert.assertFalse(emptyInputSource.reader(
- EasyMock.createMock(InputRowSchema.class),
- mockFormat,
- temporaryFolder.newFolder()
- ).read().hasNext());
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]