This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6ddb828c7a Able to filter Cloud objects with glob notation. (#12659)
6ddb828c7a is described below
commit 6ddb828c7a6422f6d2b2a9c59810b5aea7d518ef
Author: Didip Kerabat <[email protected]>
AuthorDate: Thu Jun 23 23:10:08 2022 -0700
Able to filter Cloud objects with glob notation. (#12659)
In a heterogeneous environment, sometimes you don't have control over the
input folder. Upstream can put any folder they want. In this situation the
S3InputSource.java is unusable.
Most people like me solved it by using Airflow to fetch the full list of
parquet files and pass it over to Druid. But doing this explodes the JSON spec.
We had a situation where 1 of the JSON spec is 16MB and that's simply too much
for Overlord.
This patch allows users to pass {"filter": "*.parquet"} and let Druid
performs the filtering of the input files.
I am using the glob notation to be consistent with the LocalFirehose syntax.
---
.../data/input/impl/CloudObjectInputSource.java | 85 +++++--
.../input/impl/CloudObjectInputSourceTest.java | 206 ++++++++++++++++
docs/ingestion/native-batch-input-source.md | 18 +-
.../druid/data/input/aliyun/OssInputSource.java | 30 ++-
.../data/input/aliyun/OssInputSourceTest.java | 17 ++
.../druid/data/input/azure/AzureInputSource.java | 27 ++-
.../data/input/azure/AzureInputSourceTest.java | 70 +++++-
extensions-core/google-extensions/pom.xml | 6 +
.../google/GoogleCloudStorageInputSource.java | 32 ++-
.../google/GoogleCloudStorageInputSourceTest.java | 78 ++++++-
.../apache/druid/data/input/s3/S3InputSource.java | 36 ++-
.../druid/data/input/s3/S3InputSourceTest.java | 258 +++++++++++++++++++++
12 files changed, 802 insertions(+), 61 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java
b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java
index d27bf4faa0..d3bc8fd553 100644
---
a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java
+++
b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java
@@ -21,6 +21,8 @@ package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Ints;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFormat;
@@ -41,9 +43,11 @@ import java.util.stream.Stream;
public abstract class CloudObjectInputSource extends AbstractInputSource
implements SplittableInputSource<List<CloudObjectLocation>>
{
+ private final String scheme;
private final List<URI> uris;
private final List<URI> prefixes;
private final List<CloudObjectLocation> objects;
+ private final String filter;
public CloudObjectInputSource(
String scheme,
@@ -52,20 +56,30 @@ public abstract class CloudObjectInputSource extends
AbstractInputSource
@Nullable List<CloudObjectLocation> objects
)
{
+ this.scheme = scheme;
this.uris = uris;
this.prefixes = prefixes;
this.objects = objects;
+ this.filter = null;
- if (!CollectionUtils.isNullOrEmpty(objects)) {
- throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(uris) ||
!CollectionUtils.isNullOrEmpty(prefixes));
- } else if (!CollectionUtils.isNullOrEmpty(uris)) {
- throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(prefixes));
- uris.forEach(uri -> CloudObjectLocation.validateUriScheme(scheme, uri));
- } else if (!CollectionUtils.isNullOrEmpty(prefixes)) {
- prefixes.forEach(uri -> CloudObjectLocation.validateUriScheme(scheme,
uri));
- } else {
- throwIfIllegalArgs(true);
- }
+ illegalArgsChecker();
+ }
+
+ public CloudObjectInputSource(
+ String scheme,
+ @Nullable List<URI> uris,
+ @Nullable List<URI> prefixes,
+ @Nullable List<CloudObjectLocation> objects,
+ @Nullable String filter
+ )
+ {
+ this.scheme = scheme;
+ this.uris = uris;
+ this.prefixes = prefixes;
+ this.objects = objects;
+ this.filter = filter;
+
+ illegalArgsChecker();
}
@JsonProperty
@@ -87,6 +101,13 @@ public abstract class CloudObjectInputSource extends
AbstractInputSource
return objects;
}
+ @Nullable
+ @JsonProperty
+ public String getFilter()
+ {
+ return filter;
+ }
+
/**
* Create the correct {@link InputEntity} for this input source given a
split on a {@link CloudObjectLocation}. This
* is called internally by {@link #formattableReader} and operates on the
output of {@link #createSplits}.
@@ -98,6 +119,9 @@ public abstract class CloudObjectInputSource extends
AbstractInputSource
* this input sources backend API. This is called internally by {@link
#createSplits} and {@link #estimateNumSplits},
* only if {@link #prefixes} is set, otherwise the splits are created
directly from {@link #uris} or {@link #objects}.
* Calling if {@link #prefixes} is not set is likely to either lead to an
empty iterator or null pointer exception.
+ *
+ * If {@link #filter} is set, the filter will be applied on {@link #uris} or
{@link #objects}.
+ * {@link #filter} uses a glob notation, for example: "*.parquet".
*/
protected abstract Stream<InputSplit<List<CloudObjectLocation>>>
getPrefixesSplitStream(SplitHintSpec splitHintSpec);
@@ -108,12 +132,23 @@ public abstract class CloudObjectInputSource extends
AbstractInputSource
)
{
if (!CollectionUtils.isNullOrEmpty(objects)) {
- return objects.stream().map(object -> new
InputSplit<>(Collections.singletonList(object)));
+ Stream<CloudObjectLocation> objectStream = objects.stream();
+
+ if (StringUtils.isNotBlank(filter)) {
+ objectStream = objectStream.filter(object ->
FilenameUtils.wildcardMatch(object.getPath(), filter));
+ }
+
+ return objectStream.map(object -> new
InputSplit<>(Collections.singletonList(object)));
}
+
if (!CollectionUtils.isNullOrEmpty(uris)) {
- return uris.stream()
- .map(CloudObjectLocation::new)
- .map(object -> new
InputSplit<>(Collections.singletonList(object)));
+ Stream<URI> uriStream = uris.stream();
+
+ if (StringUtils.isNotBlank(filter)) {
+ uriStream = uriStream.filter(uri ->
FilenameUtils.wildcardMatch(uri.toString(), filter));
+ }
+
+ return uriStream.map(CloudObjectLocation::new).map(object -> new
InputSplit<>(Collections.singletonList(object)));
}
return getPrefixesSplitStream(getSplitHintSpecOrDefault(splitHintSpec));
@@ -164,15 +199,31 @@ public abstract class CloudObjectInputSource extends
AbstractInputSource
return false;
}
CloudObjectInputSource that = (CloudObjectInputSource) o;
- return Objects.equals(uris, that.uris) &&
+ return Objects.equals(scheme, that.scheme) &&
+ Objects.equals(uris, that.uris) &&
Objects.equals(prefixes, that.prefixes) &&
- Objects.equals(objects, that.objects);
+ Objects.equals(objects, that.objects) &&
+ Objects.equals(filter, that.filter);
}
@Override
public int hashCode()
{
- return Objects.hash(uris, prefixes, objects);
+ return Objects.hash(scheme, uris, prefixes, objects, filter);
+ }
+
+ private void illegalArgsChecker() throws IllegalArgumentException
+ {
+ if (!CollectionUtils.isNullOrEmpty(objects)) {
+ throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(uris) ||
!CollectionUtils.isNullOrEmpty(prefixes));
+ } else if (!CollectionUtils.isNullOrEmpty(uris)) {
+ throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(prefixes));
+ uris.forEach(uri -> CloudObjectLocation.validateUriScheme(scheme, uri));
+ } else if (!CollectionUtils.isNullOrEmpty(prefixes)) {
+ prefixes.forEach(uri -> CloudObjectLocation.validateUriScheme(scheme,
uri));
+ } else {
+ throwIfIllegalArgs(true);
+ }
}
private void throwIfIllegalArgs(boolean clause) throws
IllegalArgumentException
diff --git
a/core/src/test/java/org/apache/druid/data/input/impl/CloudObjectInputSourceTest.java
b/core/src/test/java/org/apache/druid/data/input/impl/CloudObjectInputSourceTest.java
new file mode 100644
index 0000000000..70421c6afa
--- /dev/null
+++
b/core/src/test/java/org/apache/druid/data/input/impl/CloudObjectInputSourceTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class CloudObjectInputSourceTest
+{
+ private static final String SCHEME = "s3";
+
+ private static final List<URI> URIS = Collections.singletonList(
+ URI.create("s3://foo/bar/file.csv")
+ );
+
+ private static final List<URI> URIS2 = Arrays.asList(
+ URI.create("s3://foo/bar/file.csv"),
+ URI.create("s3://bar/foo/file2.parquet")
+ );
+
+ private static final List<URI> PREFIXES = Arrays.asList(
+ URI.create("s3://foo/bar/"),
+ URI.create("s3://bar/foo/")
+ );
+
+ private static final List<CloudObjectLocation> OBJECTS =
Collections.singletonList(
+ new CloudObjectLocation(URI.create("s3://foo/bar/file.csv"))
+ );
+
+ private static final List<CloudObjectLocation> OBJECTS_BEFORE_FILTER =
Arrays.asList(
+ new CloudObjectLocation(URI.create("s3://foo/bar/file.csv")),
+ new CloudObjectLocation(URI.create("s3://bar/foo/file2.parquet"))
+ );
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testGetUris()
+ {
+ CloudObjectInputSource inputSource =
Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
+ .useConstructor(SCHEME, URIS, null, null, null)
+ .defaultAnswer(Mockito.CALLS_REAL_METHODS)
+ );
+
+ Assert.assertEquals(
+ URIS,
+ inputSource.getUris()
+ );
+ }
+
+ @Test
+ public void testGetPrefixes()
+ {
+ CloudObjectInputSource inputSource =
Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
+ .useConstructor(SCHEME, null, PREFIXES, null, null)
+ .defaultAnswer(Mockito.CALLS_REAL_METHODS)
+ );
+
+ Assert.assertEquals(
+ PREFIXES,
+ inputSource.getPrefixes()
+ );
+ }
+
+ @Test
+ public void testGetFilter()
+ {
+ CloudObjectInputSource inputSource =
Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
+ .useConstructor(SCHEME, URIS, null, null, "*.parquet")
+ .defaultAnswer(Mockito.CALLS_REAL_METHODS)
+ );
+
+ Assert.assertEquals("*.parquet", inputSource.getFilter());
+ }
+
+ @Test
+ public void testInequality()
+ {
+ CloudObjectInputSource inputSource1 =
Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
+ .useConstructor(SCHEME, URIS, null, null, "*.parquet")
+ .defaultAnswer(Mockito.CALLS_REAL_METHODS)
+ );
+
+ CloudObjectInputSource inputSource2 =
Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
+ .useConstructor(SCHEME, URIS, null, null, "*.csv")
+ .defaultAnswer(Mockito.CALLS_REAL_METHODS)
+ );
+
+ Assert.assertEquals("*.parquet", inputSource1.getFilter());
+ Assert.assertEquals("*.csv", inputSource2.getFilter());
+ Assert.assertFalse(inputSource2.equals(inputSource1));
+ }
+
+ @Test
+ public void testWithUrisFilter()
+ {
+ CloudObjectInputSource inputSource =
Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
+ .useConstructor(SCHEME, URIS2, null, null, "*.csv")
+ .defaultAnswer(Mockito.CALLS_REAL_METHODS)
+ );
+
+ Stream<InputSplit<List<CloudObjectLocation>>> splits =
inputSource.createSplits(
+ new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
+ new MaxSizeSplitHintSpec(null, 1)
+ );
+
+ List<CloudObjectLocation> returnedLocations =
splits.map(InputSplit::get).collect(Collectors.toList()).get(0);
+
+ List<URI> returnedLocationUris = returnedLocations.stream().map(object ->
object.toUri(SCHEME)).collect(Collectors.toList());
+
+ Assert.assertEquals("*.csv", inputSource.getFilter());
+ Assert.assertEquals(URIS, returnedLocationUris);
+ }
+
+ @Test
+ public void testWithUris()
+ {
+ CloudObjectInputSource inputSource =
Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
+ .useConstructor(SCHEME, URIS, null, null, null)
+ .defaultAnswer(Mockito.CALLS_REAL_METHODS)
+ );
+
+ Stream<InputSplit<List<CloudObjectLocation>>> splits =
inputSource.createSplits(
+ new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
+ new MaxSizeSplitHintSpec(null, 1)
+ );
+
+ List<CloudObjectLocation> returnedLocations =
splits.map(InputSplit::get).collect(Collectors.toList()).get(0);
+
+ List<URI> returnedLocationUris = returnedLocations.stream().map(object ->
object.toUri(SCHEME)).collect(Collectors.toList());
+
+ Assert.assertEquals(null, inputSource.getFilter());
+ Assert.assertEquals(URIS, returnedLocationUris);
+ }
+
+ @Test
+ public void testWithObjectsFilter()
+ {
+ CloudObjectInputSource inputSource =
Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
+ .useConstructor(SCHEME, null, null, OBJECTS_BEFORE_FILTER, "*.csv")
+ .defaultAnswer(Mockito.CALLS_REAL_METHODS)
+ );
+
+ Stream<InputSplit<List<CloudObjectLocation>>> splits =
inputSource.createSplits(
+ new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
+ new MaxSizeSplitHintSpec(null, 1)
+ );
+
+ List<CloudObjectLocation> returnedLocations =
splits.map(InputSplit::get).collect(Collectors.toList()).get(0);
+
+ List<URI> returnedLocationUris = returnedLocations.stream().map(object ->
object.toUri(SCHEME)).collect(Collectors.toList());
+
+ Assert.assertEquals("*.csv", inputSource.getFilter());
+ Assert.assertEquals(URIS, returnedLocationUris);
+ }
+
+ @Test
+ public void testWithObjects()
+ {
+ CloudObjectInputSource inputSource =
Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
+ .useConstructor(SCHEME, null, null, OBJECTS, null)
+ .defaultAnswer(Mockito.CALLS_REAL_METHODS)
+ );
+
+ Stream<InputSplit<List<CloudObjectLocation>>> splits =
inputSource.createSplits(
+ new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
+ new MaxSizeSplitHintSpec(null, 1)
+ );
+
+ List<CloudObjectLocation> returnedLocations =
splits.map(InputSplit::get).collect(Collectors.toList()).get(0);
+
+ Assert.assertEquals(null, inputSource.getFilter());
+ Assert.assertEquals(OBJECTS, returnedLocations);
+ }
+}
diff --git a/docs/ingestion/native-batch-input-source.md
b/docs/ingestion/native-batch-input-source.md
index 4d60a82ee9..8ce42073dc 100644
--- a/docs/ingestion/native-batch-input-source.md
+++ b/docs/ingestion/native-batch-input-source.md
@@ -29,7 +29,7 @@ For general information on native batch indexing and parallel
task indexing, see
## S3 input source
-> You need to include the
[`druid-s3-extensions`](../development/extensions-core/s3.md) as an extension
to use the S3 input source.
+> You need to include the
[`druid-s3-extensions`](../development/extensions-core/s3.md) as an extension
to use the S3 input source.
The S3 input source reads objects directly from S3. You can specify either:
- a list of S3 URI strings
@@ -46,6 +46,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "s3",
+ "filter": "*.json",
"uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"]
},
"inputFormat": {
@@ -62,6 +63,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "s3",
+ "filter": "*.parquet",
"prefixes": ["s3://foo/bar/", "s3://bar/foo/"]
},
"inputFormat": {
@@ -79,6 +81,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "s3",
+ "filter": "*.json",
"objects": [
{ "bucket": "foo", "path": "bar/file1.json"},
{ "bucket": "bar", "path": "foo/file2.json"}
@@ -98,6 +101,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "s3",
+ "filter": "*.json",
"uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
"properties": {
"accessKeyId": "KLJ78979SDFdS2",
@@ -118,6 +122,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "s3",
+ "filter": "*.json",
"uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
"properties": {
"accessKeyId": "KLJ78979SDFdS2",
@@ -139,6 +144,7 @@ Sample specs:
|uris|JSON array of URIs where S3 objects to be ingested are
located.|None|`uris` or `prefixes` or `objects` must be set|
|prefixes|JSON array of URI prefixes for the locations of S3 objects to be
ingested. Empty objects starting with one of the given prefixes will be
skipped.|None|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of S3 Objects to be ingested.|None|`uris` or `prefixes` or
`objects` must be set|
+|filter|A wildcard filter for files. See
[here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter)
for more information. Files matching the filter criteria are considered for
ingestion. Files not matching the filter criteria are ignored.|None|no|
|properties|Properties Object for overriding the default S3 configuration. See
below for more information.|None|No (defaults will be used if not given)
Note that the S3 input source will skip all empty objects only when `prefixes`
is specified.
@@ -179,6 +185,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "google",
+ "filter": "*.json",
"uris": ["gs://foo/bar/file.json", "gs://bar/foo/file2.json"]
},
"inputFormat": {
@@ -195,6 +202,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "google",
+ "filter": "*.parquet",
"prefixes": ["gs://foo/bar/", "gs://bar/foo/"]
},
"inputFormat": {
@@ -212,6 +220,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "google",
+ "filter": "*.json",
"objects": [
{ "bucket": "foo", "path": "bar/file1.json"},
{ "bucket": "bar", "path": "foo/file2.json"}
@@ -231,6 +240,7 @@ Sample specs:
|uris|JSON array of URIs where Google Cloud Storage objects to be ingested are
located.|None|`uris` or `prefixes` or `objects` must be set|
|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage
objects to be ingested. Empty objects starting with one of the given prefixes
will be skipped.|None|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of Google Cloud Storage objects to be
ingested.|None|`uris` or `prefixes` or `objects` must be set|
+|filter|A wildcard filter for files. See
[here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter)
for more information. Files matching the filter criteria are considered for
ingestion. Files not matching the filter criteria are ignored.|None|no|
Note that the Google Cloud Storage input source will skip all empty objects
only when `prefixes` is specified.
@@ -256,6 +266,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "azure",
+ "filter": "*.json",
"uris": ["azure://container/prefix1/file.json",
"azure://container/prefix2/file2.json"]
},
"inputFormat": {
@@ -272,6 +283,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "azure",
+ "filter": "*.parquet",
"prefixes": ["azure://container/prefix1/",
"azure://container/prefix2/"]
},
"inputFormat": {
@@ -289,6 +301,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "azure",
+ "filter": "*.json",
"objects": [
{ "bucket": "container", "path": "prefix1/file1.json"},
{ "bucket": "container", "path": "prefix2/file2.json"}
@@ -308,6 +321,7 @@ Sample specs:
|uris|JSON array of URIs where the Azure objects to be ingested are located,
in the form "azure://\<container>/\<path-to-file\>"|None|`uris` or `prefixes`
or `objects` must be set|
|prefixes|JSON array of URI prefixes for the locations of Azure objects to
ingest, in the form "azure://\<container>/\<prefix\>". Empty objects starting
with one of the given prefixes are skipped.|None|`uris` or `prefixes` or
`objects` must be set|
|objects|JSON array of Azure objects to ingest.|None|`uris` or `prefixes` or
`objects` must be set|
+|filter|A wildcard filter for files. See
[here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter)
for more information. Files matching the filter criteria are considered for
ingestion. Files not matching the filter criteria are ignored.|None|no|
Note that the Azure input source skips all empty objects only when `prefixes`
is specified.
@@ -546,7 +560,7 @@ Sample spec:
|property|description|required?|
|--------|-----------|---------|
|type|This should be "local".|yes|
-|filter|A wildcard filter for files. See
[here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter)
for more information.|yes if `baseDir` is specified|
+|filter|A wildcard filter for files. See
[here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter)
for more information. Files matching the filter criteria are considered for
ingestion. Files not matching the filter criteria are ignored.|yes if `baseDir`
is specified|
|baseDir|Directory to search recursively for files to be ingested. Empty files
under the `baseDir` will be skipped.|At least one of `baseDir` or `files`
should be specified|
|files|File paths to ingest. Some files can be ignored to avoid ingesting
duplicate files if they are located under the specified `baseDir`. Empty files
will be skipped.|At least one of `baseDir` or `files` should be specified|
diff --git
a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/data/input/aliyun/OssInputSource.java
b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/data/input/aliyun/OssInputSource.java
index e8559e5606..fbb2bb4b5c 100644
---
a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/data/input/aliyun/OssInputSource.java
+++
b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/data/input/aliyun/OssInputSource.java
@@ -27,6 +27,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
+import com.google.common.collect.Iterators;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputSplit;
@@ -74,10 +77,11 @@ public class OssInputSource extends CloudObjectInputSource
@JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
+ @JsonProperty("filter") @Nullable String filter,
@JsonProperty("properties") @Nullable OssClientConfig inputSourceConfig
)
{
- super(OssStorageDruidModule.SCHEME, uris, prefixes, objects);
+ super(OssStorageDruidModule.SCHEME, uris, prefixes, objects, filter);
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig,
"inputDataConfig");
Preconditions.checkNotNull(client, "client");
this.inputSourceConfig = inputSourceConfig;
@@ -130,6 +134,7 @@ public class OssInputSource extends CloudObjectInputSource
null,
null,
split.get(),
+ getFilter(),
getOssInputSourceConfig()
);
}
@@ -163,16 +168,29 @@ public class OssInputSource extends CloudObjectInputSource
"uris=" + getUris() +
", prefixes=" + getPrefixes() +
", objects=" + getObjects() +
+ ", filter=" + getFilter() +
", ossInputSourceConfig=" + getOssInputSourceConfig() +
'}';
}
private Iterable<OSSObjectSummary> getIterableObjectsFromPrefixes()
{
- return () -> OssUtils.objectSummaryIterator(
- clientSupplier.get(),
- getPrefixes(),
- inputDataConfig.getMaxListingLength()
- );
+ return () -> {
+ Iterator<OSSObjectSummary> iterator = OssUtils.objectSummaryIterator(
+ clientSupplier.get(),
+ getPrefixes(),
+ inputDataConfig.getMaxListingLength()
+ );
+
+ // Skip files that didn't match filter.
+ if (StringUtils.isNotBlank(getFilter())) {
+ iterator = Iterators.filter(
+ iterator,
+ object -> FilenameUtils.wildcardMatch(object.getKey(), getFilter())
+ );
+ }
+
+ return iterator;
+ };
}
}
diff --git
a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java
b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java
index b7e95402e2..6a677325a3 100644
---
a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java
+++
b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java
@@ -142,6 +142,7 @@ public class OssInputSourceTest extends
InitializedNullHandlingTest
EXPECTED_URIS,
null,
null,
+ null,
null
);
final OssInputSource serdeWithUris =
MAPPER.readValue(MAPPER.writeValueAsString(withUris), OssInputSource.class);
@@ -157,6 +158,7 @@ public class OssInputSourceTest extends
InitializedNullHandlingTest
null,
PREFIXES,
null,
+ null,
null
);
final OssInputSource serdeWithPrefixes =
@@ -173,6 +175,7 @@ public class OssInputSourceTest extends
InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
+ null,
null
);
final OssInputSource serdeWithPrefixes =
@@ -196,6 +199,7 @@ public class OssInputSourceTest extends
InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
+ null,
mockConfigPropertiesWithoutKeyAndSecret
);
Assert.assertNotNull(withPrefixes);
@@ -215,6 +219,7 @@ public class OssInputSourceTest extends
InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
+ null,
CLOUD_CONFIG_PROPERTIES
);
final OssInputSource serdeWithPrefixes =
@@ -234,6 +239,7 @@ public class OssInputSourceTest extends
InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
+ null,
null
);
final OssInputSource serdeWithPrefixes =
@@ -251,6 +257,7 @@ public class OssInputSourceTest extends
InitializedNullHandlingTest
ImmutableList.of(),
ImmutableList.of(),
EXPECTED_LOCATION,
+ null,
null
);
final OssInputSource serdeWithPrefixes =
@@ -269,6 +276,7 @@ public class OssInputSourceTest extends
InitializedNullHandlingTest
EXPECTED_URIS,
PREFIXES,
EXPECTED_LOCATION,
+ null,
null
);
}
@@ -284,6 +292,7 @@ public class OssInputSourceTest extends
InitializedNullHandlingTest
EXPECTED_URIS,
PREFIXES,
ImmutableList.of(),
+ null,
null
);
}
@@ -299,6 +308,7 @@ public class OssInputSourceTest extends
InitializedNullHandlingTest
ImmutableList.of(),
PREFIXES,
EXPECTED_LOCATION,
+ null,
null
);
}
@@ -312,6 +322,7 @@ public class OssInputSourceTest extends
InitializedNullHandlingTest
EXPECTED_URIS,
null,
null,
+ null,
null
);
@@ -337,6 +348,7 @@ public class OssInputSourceTest extends
InitializedNullHandlingTest
null,
PREFIXES,
null,
+ null,
null
);
@@ -363,6 +375,7 @@ public class OssInputSourceTest extends
InitializedNullHandlingTest
null,
PREFIXES,
null,
+ null,
null
);
@@ -392,6 +405,7 @@ public class OssInputSourceTest extends
InitializedNullHandlingTest
null,
PREFIXES,
null,
+ null,
null
);
@@ -420,6 +434,7 @@ public class OssInputSourceTest extends
InitializedNullHandlingTest
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
null,
+ null,
null
);
@@ -450,6 +465,7 @@ public class OssInputSourceTest extends
InitializedNullHandlingTest
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
null,
+ null,
null
);
@@ -493,6 +509,7 @@ public class OssInputSourceTest extends
InitializedNullHandlingTest
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)),
null,
+ null,
null
);
diff --git
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
index d2382069bc..dd28e9273d 100644
---
a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
+++
b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java
@@ -23,6 +23,9 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
@@ -68,10 +71,11 @@ public class AzureInputSource extends CloudObjectInputSource
@JacksonInject AzureInputDataConfig inputDataConfig,
@JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
- @JsonProperty("objects") @Nullable List<CloudObjectLocation> objects
+ @JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
+ @JsonProperty("filter") @Nullable String filter
)
{
- super(SCHEME, uris, prefixes, objects);
+ super(SCHEME, uris, prefixes, objects, filter);
this.storage = Preconditions.checkNotNull(storage, "AzureStorage");
this.entityFactory = Preconditions.checkNotNull(entityFactory,
"AzureEntityFactory");
this.azureCloudBlobIterableFactory = Preconditions.checkNotNull(
@@ -96,7 +100,8 @@ public class AzureInputSource extends CloudObjectInputSource
inputDataConfig,
null,
null,
- split.get()
+ split.get(),
+ getFilter()
);
}
@@ -113,6 +118,7 @@ public class AzureInputSource extends CloudObjectInputSource
getIterableObjectsFromPrefixes().iterator(),
blobHolder -> new InputFileAttribute(blobHolder.getBlobLength())
);
+
return Streams.sequentialStreamFrom(splitIterator)
.map(objects -> objects.stream()
.map(azureCloudBlobToLocationConverter::createCloudObjectLocation)
@@ -122,7 +128,19 @@ public class AzureInputSource extends
CloudObjectInputSource
private Iterable<CloudBlobHolder> getIterableObjectsFromPrefixes()
{
- return azureCloudBlobIterableFactory.create(getPrefixes(),
inputDataConfig.getMaxListingLength());
+ return () -> {
+ Iterator<CloudBlobHolder> iterator =
azureCloudBlobIterableFactory.create(getPrefixes(),
inputDataConfig.getMaxListingLength()).iterator();
+
+ // Skip files that didn't match filter.
+ if (StringUtils.isNotBlank(getFilter())) {
+ iterator = Iterators.filter(
+ iterator,
+ object -> FilenameUtils.wildcardMatch(object.getName(),
getFilter())
+ );
+ }
+
+ return iterator;
+ };
}
@Override
@@ -165,6 +183,7 @@ public class AzureInputSource extends CloudObjectInputSource
"uris=" + getUris() +
", prefixes=" + getPrefixes() +
", objects=" + getObjects() +
+ ", filter=" + getFilter() +
'}';
}
}
diff --git
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
index 4a9773a0f2..7feef58799 100644
---
a/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
+++
b/extensions-core/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java
@@ -20,7 +20,9 @@
package org.apache.druid.data.input.azure;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.commons.io.FilenameUtils;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.CloudObjectLocation;
@@ -54,7 +56,7 @@ public class AzureInputSourceTest extends EasyMockSupport
private final List<URI> EMPTY_PREFIXES = ImmutableList.of();
private final List<CloudObjectLocation> EMPTY_OBJECTS = ImmutableList.of();
private static final String CONTAINER = "CONTAINER";
- private static final String BLOB_PATH = "BLOB_PATH";
+ private static final String BLOB_PATH = "BLOB_PATH.csv";
private static final CloudObjectLocation CLOUD_OBJECT_LOCATION_1 = new
CloudObjectLocation(CONTAINER, BLOB_PATH);
private static final int MAX_LISTING_LENGTH = 10;
@@ -106,7 +108,8 @@ public class AzureInputSourceTest extends EasyMockSupport
inputDataConfig,
EMPTY_URIS,
EMPTY_PREFIXES,
- EMPTY_OBJECTS
+ EMPTY_OBJECTS,
+ null
);
}
@@ -126,7 +129,8 @@ public class AzureInputSourceTest extends EasyMockSupport
inputDataConfig,
EMPTY_URIS,
EMPTY_PREFIXES,
- objects
+ objects,
+ null
);
Assert.assertEquals(1, inputSplit.get().size());
@@ -159,7 +163,55 @@ public class AzureInputSourceTest extends EasyMockSupport
inputDataConfig,
EMPTY_URIS,
prefixes,
- EMPTY_OBJECTS
+ EMPTY_OBJECTS,
+ null
+ );
+
+ Stream<InputSplit<List<CloudObjectLocation>>> cloudObjectStream =
azureInputSource.getPrefixesSplitStream(
+ new MaxSizeSplitHintSpec(null, 1)
+ );
+
+ List<List<CloudObjectLocation>> actualCloudLocationList =
cloudObjectStream.map(InputSplit::get)
+
.collect(Collectors.toList());
+ verifyAll();
+ Assert.assertEquals(expectedCloudLocations, actualCloudLocationList);
+ }
+
+ @Test
+ public void
test_getPrefixesSplitStream_withFilter_successfullyCreatesCloudLocation_returnsExpectedLocations()
+ {
+ List<URI> prefixes = ImmutableList.of(PREFIX_URI);
+ List<List<CloudObjectLocation>> expectedCloudLocations =
ImmutableList.of(ImmutableList.of(CLOUD_OBJECT_LOCATION_1));
+ List<CloudBlobHolder> expectedCloudBlobs =
ImmutableList.of(cloudBlobDruid1);
+ Iterator<CloudBlobHolder> expectedCloudBlobsIterator =
expectedCloudBlobs.iterator();
+ String filter = "*.csv";
+
+ expectedCloudBlobsIterator = Iterators.filter(
+ expectedCloudBlobsIterator,
+ object -> FilenameUtils.wildcardMatch(object.getName(), filter)
+ );
+
+
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH);
+ EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes,
MAX_LISTING_LENGTH)).andReturn(
+ azureCloudBlobIterable);
+
EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(expectedCloudBlobsIterator);
+
EasyMock.expect(azureCloudBlobToLocationConverter.createCloudObjectLocation(cloudBlobDruid1))
+ .andReturn(CLOUD_OBJECT_LOCATION_1);
+
EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L).anyTimes();
+ EasyMock.expect(cloudBlobDruid1.getName()).andReturn(BLOB_PATH).anyTimes();
+
+ replayAll();
+
+ azureInputSource = new AzureInputSource(
+ storage,
+ entityFactory,
+ azureCloudBlobIterableFactory,
+ azureCloudBlobToLocationConverter,
+ inputDataConfig,
+ EMPTY_URIS,
+ prefixes,
+ EMPTY_OBJECTS,
+ filter
);
Stream<InputSplit<List<CloudObjectLocation>>> cloudObjectStream =
azureInputSource.getPrefixesSplitStream(
@@ -187,7 +239,8 @@ public class AzureInputSourceTest extends EasyMockSupport
inputDataConfig,
EMPTY_URIS,
prefixes,
- EMPTY_OBJECTS
+ EMPTY_OBJECTS,
+ null
);
SplittableInputSource<List<CloudObjectLocation>> newInputSource =
azureInputSource.withSplit(inputSplit);
@@ -207,11 +260,12 @@ public class AzureInputSourceTest extends EasyMockSupport
inputDataConfig,
EMPTY_URIS,
prefixes,
- EMPTY_OBJECTS
+ EMPTY_OBJECTS,
+ null
);
String actualToString = azureInputSource.toString();
- Assert.assertEquals("AzureInputSource{uris=[],
prefixes=[azure://container/blob], objects=[]}", actualToString);
+ Assert.assertEquals("AzureInputSource{uris=[],
prefixes=[azure://container/blob], objects=[], filter=null}", actualToString);
}
@Test
@@ -225,6 +279,8 @@ public class AzureInputSourceTest extends EasyMockSupport
.withNonnullFields("azureCloudBlobIterableFactory")
.withNonnullFields("azureCloudBlobToLocationConverter")
.withNonnullFields("inputDataConfig")
+ .withNonnullFields("filter")
+ .withNonnullFields("scheme")
.verify();
}
diff --git a/extensions-core/google-extensions/pom.xml
b/extensions-core/google-extensions/pom.xml
index 0b3ad15fa5..d143dd7802 100644
--- a/extensions-core/google-extensions/pom.xml
+++ b/extensions-core/google-extensions/pom.xml
@@ -63,6 +63,12 @@
<artifactId>commons-io</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>2.6</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
diff --git
a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java
b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java
index 47ba4b7c6a..e928eb4ee7 100644
---
a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java
+++
b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java
@@ -23,6 +23,9 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.services.storage.model.StorageObject;
+import com.google.common.collect.Iterators;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputSplit;
@@ -59,10 +62,11 @@ public class GoogleCloudStorageInputSource extends
CloudObjectInputSource
@JacksonInject GoogleInputDataConfig inputDataConfig,
@JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
- @JsonProperty("objects") @Nullable List<CloudObjectLocation> objects
+ @JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
+ @JsonProperty("filter") @Nullable String filter
)
{
- super(GoogleStorageDruidModule.SCHEME_GS, uris, prefixes, objects);
+ super(GoogleStorageDruidModule.SCHEME_GS, uris, prefixes, objects, filter);
this.storage = storage;
this.inputDataConfig = inputDataConfig;
}
@@ -111,7 +115,7 @@ public class GoogleCloudStorageInputSource extends
CloudObjectInputSource
@Override
public SplittableInputSource<List<CloudObjectLocation>>
withSplit(InputSplit<List<CloudObjectLocation>> split)
{
- return new GoogleCloudStorageInputSource(storage, inputDataConfig, null,
null, split.get());
+ return new GoogleCloudStorageInputSource(storage, inputDataConfig, null,
null, split.get(), getFilter());
}
private CloudObjectLocation byteSourceFromStorageObject(final StorageObject
storageObject)
@@ -121,12 +125,23 @@ public class GoogleCloudStorageInputSource extends
CloudObjectInputSource
private Iterable<StorageObject> storageObjectIterable()
{
- return () ->
- GoogleUtils.lazyFetchingStorageObjectsIterator(
- storage,
- getPrefixes().iterator(),
- inputDataConfig.getMaxListingLength()
+ return () -> {
+ Iterator<StorageObject> iterator =
GoogleUtils.lazyFetchingStorageObjectsIterator(
+ storage,
+ getPrefixes().iterator(),
+ inputDataConfig.getMaxListingLength()
+ );
+
+ // Skip files that didn't match filter.
+ if (StringUtils.isNotBlank(getFilter())) {
+ iterator = Iterators.filter(
+ iterator,
+ object -> FilenameUtils.wildcardMatch(object.getName(),
getFilter())
);
+ }
+
+ return iterator;
+ };
}
@Override
@@ -136,6 +151,7 @@ public class GoogleCloudStorageInputSource extends
CloudObjectInputSource
"uris=" + getUris() +
", prefixes=" + getPrefixes() +
", objects=" + getObjects() +
+ ", filter=" + getFilter() +
'}';
}
}
diff --git
a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java
b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java
index 4b42efe0d6..dba662a911 100644
---
a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java
+++
b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java
@@ -56,7 +56,9 @@ import org.apache.druid.utils.CompressionUtils;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -86,6 +88,12 @@ public class GoogleCloudStorageInputSourceTest extends
InitializedNullHandlingTe
URI.create("gs://bar/foo/file2.csv.gz")
);
+ private static final List<URI> URIS_BEFORE_FILTER = Arrays.asList(
+ URI.create("gs://foo/bar/file.csv"),
+ URI.create("gs://bar/foo/file2.csv"),
+ URI.create("gs://bar/foo/file3.txt")
+ );
+
private static final List<List<CloudObjectLocation>> EXPECTED_OBJECTS =
EXPECTED_URIS.stream()
.map(uri -> Collections.singletonList(new
CloudObjectLocation(uri)))
@@ -96,19 +104,19 @@ public class GoogleCloudStorageInputSourceTest extends
InitializedNullHandlingTe
URI.create("gs://bar/foo")
);
- private static final List<CloudObjectLocation> EXPECTED_LOCATION =
- ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv"));
-
private static final DateTime NOW = DateTimes.nowUtc();
private static final byte[] CONTENT =
StringUtils.toUtf8(StringUtils.format("%d,hello,world",
NOW.getMillis()));
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
@Test
public void testSerde() throws Exception
{
final ObjectMapper mapper = createGoogleObjectMapper();
final GoogleCloudStorageInputSource withUris =
- new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG,
EXPECTED_URIS, ImmutableList.of(), null);
+ new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG,
EXPECTED_URIS, ImmutableList.of(), null, null);
final GoogleCloudStorageInputSource serdeWithUris =
mapper.readValue(mapper.writeValueAsString(withUris),
GoogleCloudStorageInputSource.class);
Assert.assertEquals(withUris, serdeWithUris);
@@ -119,7 +127,7 @@ public class GoogleCloudStorageInputSourceTest extends
InitializedNullHandlingTe
{
final ObjectMapper mapper = createGoogleObjectMapper();
final GoogleCloudStorageInputSource withPrefixes =
- new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG,
ImmutableList.of(), PREFIXES, null);
+ new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG,
ImmutableList.of(), PREFIXES, null, null);
final GoogleCloudStorageInputSource serdeWithPrefixes =
mapper.readValue(mapper.writeValueAsString(withPrefixes),
GoogleCloudStorageInputSource.class);
Assert.assertEquals(withPrefixes, serdeWithPrefixes);
@@ -135,7 +143,8 @@ public class GoogleCloudStorageInputSourceTest extends
InitializedNullHandlingTe
INPUT_DATA_CONFIG,
null,
null,
- ImmutableList.of(new CloudObjectLocation("foo", "bar/file.gz"))
+ ImmutableList.of(new CloudObjectLocation("foo", "bar/file.gz")),
+ null
);
final GoogleCloudStorageInputSource serdeWithObjects =
mapper.readValue(mapper.writeValueAsString(withObjects),
GoogleCloudStorageInputSource.class);
@@ -147,7 +156,26 @@ public class GoogleCloudStorageInputSourceTest extends
InitializedNullHandlingTe
{
GoogleCloudStorageInputSource inputSource =
- new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG,
EXPECTED_URIS, ImmutableList.of(), null);
+ new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG,
EXPECTED_URIS, ImmutableList.of(), null, null);
+
+ Stream<InputSplit<List<CloudObjectLocation>>> splits =
inputSource.createSplits(
+ new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
+ null
+ );
+ Assert.assertEquals(EXPECTED_OBJECTS,
splits.map(InputSplit::get).collect(Collectors.toList()));
+ }
+
+ @Test
+ public void testWithUrisFilter()
+ {
+ GoogleCloudStorageInputSource inputSource = new
GoogleCloudStorageInputSource(
+ STORAGE,
+ INPUT_DATA_CONFIG,
+ URIS_BEFORE_FILTER,
+ null,
+ null,
+ "*.csv"
+ );
Stream<InputSplit<List<CloudObjectLocation>>> splits =
inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
@@ -156,6 +184,36 @@ public class GoogleCloudStorageInputSourceTest extends
InitializedNullHandlingTe
Assert.assertEquals(EXPECTED_OBJECTS,
splits.map(InputSplit::get).collect(Collectors.toList()));
}
+ @Test
+ public void testIllegalObjectsAndPrefixes()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ // constructor will explode
+ new GoogleCloudStorageInputSource(
+ STORAGE,
+ INPUT_DATA_CONFIG,
+ null,
+ PREFIXES,
+ EXPECTED_OBJECTS.get(0),
+ "*.csv"
+ );
+ }
+
+ @Test
+ public void testIllegalUrisAndPrefixes()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ // constructor will explode
+ new GoogleCloudStorageInputSource(
+ STORAGE,
+ INPUT_DATA_CONFIG,
+ URIS_BEFORE_FILTER,
+ PREFIXES,
+ null,
+ "*.csv"
+ );
+ }
+
@Test
public void testWithPrefixesSplit() throws IOException
{
@@ -168,7 +226,7 @@ public class GoogleCloudStorageInputSourceTest extends
InitializedNullHandlingTe
EasyMock.replay(INPUT_DATA_CONFIG);
GoogleCloudStorageInputSource inputSource =
- new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null,
PREFIXES, null);
+ new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null,
PREFIXES, null, null);
Stream<InputSplit<List<CloudObjectLocation>>> splits =
inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
@@ -190,7 +248,7 @@ public class GoogleCloudStorageInputSourceTest extends
InitializedNullHandlingTe
EasyMock.replay(INPUT_DATA_CONFIG);
GoogleCloudStorageInputSource inputSource =
- new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null,
PREFIXES, null);
+ new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null,
PREFIXES, null, null);
Stream<InputSplit<List<CloudObjectLocation>>> splits =
inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
@@ -221,6 +279,7 @@ public class GoogleCloudStorageInputSourceTest extends
InitializedNullHandlingTe
INPUT_DATA_CONFIG,
null,
PREFIXES,
+ null,
null
);
@@ -264,6 +323,7 @@ public class GoogleCloudStorageInputSourceTest extends
InitializedNullHandlingTe
INPUT_DATA_CONFIG,
null,
PREFIXES,
+ null,
null
);
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
index 20688ab072..e94b679a25 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
@@ -33,6 +33,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
+import com.google.common.collect.Iterators;
+import org.apache.commons.io.FilenameUtils;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputSplit;
@@ -94,11 +96,12 @@ public class S3InputSource extends CloudObjectInputSource
@JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
+ @JsonProperty("filter") @Nullable String filter,
@JsonProperty("properties") @Nullable S3InputSourceConfig
s3InputSourceConfig,
@JacksonInject AWSCredentialsProvider awsCredentialsProvider
)
{
- super(S3StorageDruidModule.SCHEME, uris, prefixes, objects);
+ super(S3StorageDruidModule.SCHEME, uris, prefixes, objects, filter);
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig,
"S3DataSegmentPusherConfig");
Preconditions.checkNotNull(s3Client, "s3Client");
this.s3InputSourceConfig = s3InputSourceConfig;
@@ -138,10 +141,11 @@ public class S3InputSource extends CloudObjectInputSource
List<URI> uris,
List<URI> prefixes,
List<CloudObjectLocation> objects,
+ String filter,
S3InputSourceConfig s3InputSourceConfig
)
{
- this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects,
s3InputSourceConfig, null);
+ this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects,
filter, s3InputSourceConfig, null);
}
@VisibleForTesting
@@ -152,11 +156,12 @@ public class S3InputSource extends CloudObjectInputSource
List<URI> uris,
List<URI> prefixes,
List<CloudObjectLocation> objects,
+ String filter,
S3InputSourceConfig s3InputSourceConfig,
int maxRetries
)
{
- this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects,
s3InputSourceConfig, null);
+ this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects,
filter, s3InputSourceConfig, null);
this.maxRetries = maxRetries;
}
@@ -233,6 +238,7 @@ public class S3InputSource extends CloudObjectInputSource
null,
null,
split.get(),
+ getFilter(),
getS3InputSourceConfig(),
awsCredentialsProvider
);
@@ -267,16 +273,30 @@ public class S3InputSource extends CloudObjectInputSource
"uris=" + getUris() +
", prefixes=" + getPrefixes() +
", objects=" + getObjects() +
+ ", filter=" + getFilter() +
", s3InputSourceConfig=" + getS3InputSourceConfig() +
'}';
}
private Iterable<S3ObjectSummary> getIterableObjectsFromPrefixes()
{
- return () -> S3Utils.objectSummaryIterator(s3ClientSupplier.get(),
- getPrefixes(),
-
inputDataConfig.getMaxListingLength(),
- maxRetries
- );
+ return () -> {
+ Iterator<S3ObjectSummary> iterator = S3Utils.objectSummaryIterator(
+ s3ClientSupplier.get(),
+ getPrefixes(),
+ inputDataConfig.getMaxListingLength(),
+ maxRetries
+ );
+
+ // Skip files that didn't match filter.
+ if (org.apache.commons.lang.StringUtils.isNotBlank(getFilter())) {
+ iterator = Iterators.filter(
+ iterator,
+ object -> FilenameUtils.wildcardMatch(object.getKey(), getFilter())
+ );
+ }
+
+ return iterator;
+ };
}
}
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
index ff8f9682ff..739e3a5889 100644
---
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
@@ -106,6 +106,11 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
private static final S3InputDataConfig INPUT_DATA_CONFIG;
private static final int MAX_LISTING_LENGTH = 10;
+ private static final List<CloudObjectLocation> EXPECTED_OBJECTS =
Arrays.asList(
+ new CloudObjectLocation(URI.create("s3://foo/bar/file.csv")),
+ new CloudObjectLocation(URI.create("s3://bar/foo/file2.csv"))
+ );
+
private static final List<URI> EXPECTED_URIS = Arrays.asList(
URI.create("s3://foo/bar/file.csv"),
URI.create("s3://bar/foo/file2.csv")
@@ -116,6 +121,18 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
URI.create("s3://bar/foo/file2.csv.gz")
);
+ private static final List<CloudObjectLocation> OBJECTS_BEFORE_FILTER =
Arrays.asList(
+ new CloudObjectLocation(URI.create("s3://foo/bar/file.csv")),
+ new CloudObjectLocation(URI.create("s3://bar/foo/file2.csv")),
+ new CloudObjectLocation(URI.create("s3://bar/foo/file3.txt"))
+ );
+
+ private static final List<URI> URIS_BEFORE_FILTER = Arrays.asList(
+ URI.create("s3://foo/bar/file.csv"),
+ URI.create("s3://bar/foo/file2.csv"),
+ URI.create("s3://bar/foo/file3.txt")
+ );
+
private static final List<List<CloudObjectLocation>> EXPECTED_COORDS =
EXPECTED_URIS.stream()
.map(uri -> Collections.singletonList(new
CloudObjectLocation(uri)))
@@ -147,6 +164,66 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
@Rule
public ExpectedException expectedException = ExpectedException.none();
+ @Test
+ public void testGetUris()
+ {
+ final S3InputSource withUris = new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ EXPECTED_URIS,
+ null,
+ null,
+ null,
+ null
+ );
+
+ Assert.assertEquals(
+ EXPECTED_URIS,
+ withUris.getUris()
+ );
+ }
+
+ @Test
+ public void testGetPrefixes()
+ {
+ final S3InputSource withPrefixes = new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ null,
+ PREFIXES,
+ null,
+ null,
+ null
+ );
+
+ Assert.assertEquals(
+ PREFIXES,
+ withPrefixes.getPrefixes()
+ );
+ }
+
+ @Test
+ public void testGetFilter()
+ {
+ final S3InputSource withUris = new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ EXPECTED_URIS,
+ null,
+ null,
+ "*.parquet",
+ null
+ );
+
+ Assert.assertEquals(
+ "*.parquet",
+ withUris.getFilter()
+ );
+ }
+
@Test
public void testSerdeWithUris() throws Exception
{
@@ -157,6 +234,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
EXPECTED_URIS,
null,
null,
+ null,
null
);
final S3InputSource serdeWithUris =
MAPPER.readValue(MAPPER.writeValueAsString(withUris), S3InputSource.class);
@@ -173,6 +251,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
PREFIXES,
null,
+ null,
null
);
final S3InputSource serdeWithPrefixes =
@@ -190,6 +269,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
+ null,
null
);
final S3InputSource serdeWithPrefixes =
@@ -213,6 +293,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
+ null,
CLOUD_CONFIG_PROPERTIES
);
final S3InputSource serdeWithPrefixes =
@@ -243,6 +324,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
+ null,
mockConfigPropertiesWithoutKeyAndSecret
);
Assert.assertNotNull(withPrefixes);
@@ -265,6 +347,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
+ null,
CLOUD_CONFIG_PROPERTIES
);
final S3InputSource serdeWithPrefixes =
@@ -286,6 +369,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
+ null,
null
);
@@ -305,6 +389,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
ImmutableList.of(),
ImmutableList.of(),
EXPECTED_LOCATION,
+ null,
null
);
final S3InputSource serdeWithPrefixes =
@@ -312,6 +397,74 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
Assert.assertEquals(withPrefixes, serdeWithPrefixes);
}
+ @Test
+ public void testWithNullJsonProps()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ // constructor will explode
+ new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+ }
+
+ @Test
+ public void testIllegalObjectsAndUris()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ // constructor will explode
+ new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ EXPECTED_URIS,
+ null,
+ EXPECTED_OBJECTS,
+ null,
+ null
+ );
+ }
+
+ @Test
+ public void testIllegalObjectsAndPrefixes()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ // constructor will explode
+ new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ null,
+ PREFIXES,
+ EXPECTED_OBJECTS,
+ null,
+ null
+ );
+ }
+
+ @Test
+ public void testIllegalUrisAndPrefixes()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ // constructor will explode
+ new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ EXPECTED_URIS,
+ PREFIXES,
+ null,
+ null,
+ null
+ );
+ }
+
@Test
public void testSerdeWithInvalidArgs()
{
@@ -324,6 +477,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
EXPECTED_URIS,
PREFIXES,
EXPECTED_LOCATION,
+ null,
null
);
}
@@ -340,6 +494,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
EXPECTED_URIS,
PREFIXES,
ImmutableList.of(),
+ null,
null
);
}
@@ -356,6 +511,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
ImmutableList.of(),
PREFIXES,
EXPECTED_LOCATION,
+ null,
null
);
}
@@ -370,6 +526,73 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
EXPECTED_URIS,
null,
null,
+ null,
+ null
+ );
+
+ Stream<InputSplit<List<CloudObjectLocation>>> splits =
inputSource.createSplits(
+ new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
+ null
+ );
+
+ Assert.assertEquals(EXPECTED_COORDS,
splits.map(InputSplit::get).collect(Collectors.toList()));
+ }
+
+ @Test
+ public void testWithUrisFilter()
+ {
+ S3InputSource inputSource = new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ URIS_BEFORE_FILTER,
+ null,
+ null,
+ "*.csv",
+ null
+ );
+
+ Stream<InputSplit<List<CloudObjectLocation>>> splits =
inputSource.createSplits(
+ new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
+ null
+ );
+
+ Assert.assertEquals(EXPECTED_COORDS,
splits.map(InputSplit::get).collect(Collectors.toList()));
+ }
+
+ @Test
+ public void testWithObjectsFilter()
+ {
+ S3InputSource inputSource = new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ null,
+ null,
+ OBJECTS_BEFORE_FILTER,
+ "*.csv",
+ null
+ );
+
+ Stream<InputSplit<List<CloudObjectLocation>>> splits =
inputSource.createSplits(
+ new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
+ null
+ );
+
+ Assert.assertEquals(EXPECTED_COORDS,
splits.map(InputSplit::get).collect(Collectors.toList()));
+ }
+
+ @Test
+ public void testWithoutObjectsFilter()
+ {
+ S3InputSource inputSource = new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ null,
+ null,
+ EXPECTED_OBJECTS,
+ null,
null
);
@@ -396,6 +619,35 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
PREFIXES,
null,
+ null,
+ null
+ );
+
+ Stream<InputSplit<List<CloudObjectLocation>>> splits =
inputSource.createSplits(
+ new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
+ new MaxSizeSplitHintSpec(null, 1)
+ );
+
+ Assert.assertEquals(EXPECTED_COORDS,
splits.map(InputSplit::get).collect(Collectors.toList()));
+ EasyMock.verify(S3_CLIENT);
+ }
+
+ @Test
+ public void testGetPrefixesSplitStreamWithFilter()
+ {
+ EasyMock.reset(S3_CLIENT);
+ expectListObjects(PREFIXES.get(0),
ImmutableList.of(URIS_BEFORE_FILTER.get(0)), CONTENT);
+ expectListObjects(PREFIXES.get(1),
ImmutableList.of(URIS_BEFORE_FILTER.get(1), URIS_BEFORE_FILTER.get(2)),
CONTENT);
+ EasyMock.replay(S3_CLIENT);
+
+ S3InputSource inputSource = new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ null,
+ PREFIXES,
+ null,
+ "*.csv",
null
);
@@ -423,6 +675,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
PREFIXES,
null,
+ null,
null
);
@@ -453,6 +706,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
PREFIXES,
null,
+ null,
null
);
@@ -482,6 +736,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
null,
+ null,
null
);
@@ -513,6 +768,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
null,
+ null,
null
);
@@ -556,6 +812,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
ImmutableList.of(PREFIXES.get(0)),
null,
null,
+ null,
3 // only have three retries since they are slow
);
@@ -599,6 +856,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)),
null,
+ null,
null
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]