This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8f7877f2855 [HUDI-6530] Applying schema during ingestion using a
schema provider for s3/gcs metadata job (#9191)
8f7877f2855 is described below
commit 8f7877f28559f49b90225a279d5a7ad50c689c0b
Author: lokesh-lingarajan-0310
<[email protected]>
AuthorDate: Fri Jul 14 08:39:36 2023 -0700
[HUDI-6530] Applying schema during ingestion using a schema provider for
s3/gcs metadata job (#9191)
Co-authored-by: Lokesh Lingarajan
<[email protected]>
---
.../org/apache/hudi/utilities/UtilHelpers.java | 8 +
.../hudi/utilities/sources/GcsEventsSource.java | 11 +-
.../hudi/utilities/sources/S3EventsSource.java | 17 +-
.../utilities/sources/TestGcsEventsSource.java | 42 ++++-
.../hudi/utilities/sources/TestS3EventsSource.java | 4 +-
.../resources/streamer-config/gcs-metadata.avsc | 60 ++++---
.../resources/streamer-config/s3-metadata.avsc | 188 +++++++++++++++++++++
7 files changed, 299 insertions(+), 31 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index a0d241752c5..35a5c9fcb47 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -60,6 +60,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor;
import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
import
org.apache.hudi.utilities.schema.postprocessor.ChainedSchemaPostProcessor;
+import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
import
org.apache.hudi.utilities.sources.processor.ChainedJsonKafkaSourcePostProcessor;
import
org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
@@ -193,6 +194,13 @@ public class UtilHelpers {
}
+ public static StructType getSourceSchema(SchemaProvider schemaProvider) {
+ if (schemaProvider != null && schemaProvider.getSourceSchema() != null &&
schemaProvider.getSourceSchema() != InputBatch.NULL_SCHEMA) {
+ return
AvroConversionUtils.convertAvroSchemaToStructType(schemaProvider.getSourceSchema());
+ }
+ return null;
+ }
+
public static Option<Transformer> createTransformer(Option<List<String>>
classNamesOpt, Option<Schema> sourceSchema,
boolean
isErrorTableWriterEnabled) throws IOException {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
index dfc9b5b2b2e..89ce7eddf54 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.gcs.MessageBatch;
@@ -35,6 +36,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,6 +98,7 @@
absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
public class GcsEventsSource extends RowSource {
private final PubsubMessagesFetcher pubsubMessagesFetcher;
+ private final SchemaProvider schemaProvider;
private final boolean ackMessages;
private final List<String> messagesToAck = new ArrayList<>();
@@ -121,6 +124,7 @@ public class GcsEventsSource extends RowSource {
this.pubsubMessagesFetcher = pubsubMessagesFetcher;
this.ackMessages = props.getBoolean(ACK_MESSAGES.key(),
ACK_MESSAGES.defaultValue());
+ this.schemaProvider = schemaProvider;
LOG.info("Created GcsEventsSource");
}
@@ -146,7 +150,12 @@ public class GcsEventsSource extends RowSource {
LOG.info("Returning checkpoint value: " + CHECKPOINT_VALUE_ZERO);
- return Pair.of(Option.of(sparkSession.read().json(eventRecords)),
CHECKPOINT_VALUE_ZERO);
+ StructType sourceSchema = UtilHelpers.getSourceSchema(schemaProvider);
+ if (sourceSchema != null) {
+ return
Pair.of(Option.of(sparkSession.read().schema(sourceSchema).json(eventRecords)),
CHECKPOINT_VALUE_ZERO);
+ } else {
+ return Pair.of(Option.of(sparkSession.read().json(eventRecords)),
CHECKPOINT_VALUE_ZERO);
+ }
}
@Override
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java
index df84217381b..78a3b58bcee 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.S3SourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.S3EventsMetaSelector;
@@ -32,6 +33,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
import java.io.Closeable;
import java.io.IOException;
@@ -47,6 +49,7 @@ import java.util.List;
public class S3EventsSource extends RowSource implements Closeable {
private final S3EventsMetaSelector pathSelector;
+ private final SchemaProvider schemaProvider;
private final List<Message> processedMessages = new ArrayList<>();
AmazonSQS sqs;
@@ -58,6 +61,7 @@ public class S3EventsSource extends RowSource implements
Closeable {
super(props, sparkContext, sparkSession, schemaProvider);
this.pathSelector = S3EventsMetaSelector.createSourceSelector(props);
this.sqs = this.pathSelector.createAmazonSqsClient();
+ this.schemaProvider = schemaProvider;
}
/**
@@ -76,9 +80,16 @@ public class S3EventsSource extends RowSource implements
Closeable {
return Pair.of(Option.empty(),
selectPathsWithLatestSqsMessage.getRight());
} else {
Dataset<String> eventRecords =
sparkSession.createDataset(selectPathsWithLatestSqsMessage.getLeft(),
Encoders.STRING());
- return Pair.of(
- Option.of(sparkSession.read().json(eventRecords)),
- selectPathsWithLatestSqsMessage.getRight());
+ StructType sourceSchema = UtilHelpers.getSourceSchema(schemaProvider);
+ if (sourceSchema != null) {
+ return Pair.of(
+
Option.of(sparkSession.read().schema(sourceSchema).json(eventRecords)),
+ selectPathsWithLatestSqsMessage.getRight());
+ } else {
+ return Pair.of(
+ Option.of(sparkSession.read().json(eventRecords)),
+ selectPathsWithLatestSqsMessage.getRight());
+ }
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
index 92cb3dc4431..653cb823233 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
@@ -63,7 +63,7 @@ public class TestGcsEventsSource extends UtilitiesTestBase {
@BeforeEach
public void beforeEach() throws Exception {
- schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(),
jsc);
+ schemaProvider = new
FilebasedSchemaProvider(Helpers.setupSchemaOnDFS("delta-streamer-config",
"gcs-metadata.avsc"), jsc);
MockitoAnnotations.initMocks(this);
props = new TypedProperties();
@@ -86,12 +86,42 @@ public class TestGcsEventsSource extends UtilitiesTestBase {
@Test
public void shouldReturnDataOnValidMessages() {
- ReceivedMessage msg1 = fileCreateMessage("objectId-1",
"{'data':{'bucket':'bucket-1'}}");
- ReceivedMessage msg2 = fileCreateMessage("objectId-2",
"{'data':{'bucket':'bucket-2'}}");
+ ReceivedMessage msg1 = fileCreateMessage("objectId-1", "{\n"
+ + " \"kind\": \"storage#object\",\n"
+ + " \"id\": \"bucket-name/object-name/1234567890123456\",\n"
+ + " \"selfLink\":
\"https://www.googleapis.com/storage/v1/b/bucket-name/o/object-name\",\n"
+ + " \"name\": \"object-name-1\",\n"
+ + " \"bucket\": \"bucket-1\",\n"
+ + " \"generation\": \"1234567890123456\",\n"
+ + " \"metageneration\": \"1\",\n"
+ + " \"contentType\": \"application/octet-stream\",\n"
+ + " \"timeCreated\": \"2023-07-09T10:15:30.000Z\",\n"
+ + " \"updated\": \"2023-07-09T10:15:30.000Z\",\n"
+ + " \"size\": \"1024\",\n"
+ + " \"md5Hash\": \"e4e68fb326b0d21a1bc7a12bb6b1e642\",\n"
+ + " \"crc32c\": \"AAAAAAAAAAA=\",\n"
+ + " \"etag\": \"CO2j+pDxx-ACEAE=\"\n"
+ + "}");
+ ReceivedMessage msg2 = fileCreateMessage("objectId-2", "{\n"
+ + " \"kind\": \"storage#object\",\n"
+ + " \"id\": \"bucket-name/object-name/1234567890123456\",\n"
+ + " \"selfLink\":
\"https://www.googleapis.com/storage/v1/b/bucket-name/o/object-name\",\n"
+ + " \"name\": \"object-name-2\",\n"
+ + " \"bucket\": \"bucket-2\",\n"
+ + " \"generation\": \"1234567890123456\",\n"
+ + " \"metageneration\": \"1\",\n"
+ + " \"contentType\": \"application/octet-stream\",\n"
+ + " \"timeCreated\": \"2023-07-09T10:15:30.000Z\",\n"
+ + " \"updated\": \"2023-07-09T10:15:30.000Z\",\n"
+ + " \"size\": \"1024\",\n"
+ + " \"md5Hash\": \"e4e68fb326b0d21a1bc7a12bb6b1e642\",\n"
+ + " \"crc32c\": \"AAAAAAAAAAA=\",\n"
+ + " \"etag\": \"CO2j+pDxx-ACEAE=\"\n"
+ + "}");
when(pubsubMessagesFetcher.fetchMessages()).thenReturn(Arrays.asList(msg1,
msg2));
- GcsEventsSource source = new GcsEventsSource(props, jsc, sparkSession,
null,
+ GcsEventsSource source = new GcsEventsSource(props, jsc, sparkSession,
schemaProvider,
pubsubMessagesFetcher);
Pair<Option<Dataset<Row>>, String> dataAndCheckpoint =
source.fetchNextBatch(Option.of("0"), 100);
source.onCommit(dataAndCheckpoint.getRight());
@@ -101,8 +131,8 @@ public class TestGcsEventsSource extends UtilitiesTestBase {
Dataset<Row> resultDs = dataAndCheckpoint.getLeft().get();
List<Row> result = resultDs.collectAsList();
- assertBucket(result.get(0), "bucket-1");
- assertBucket(result.get(1), "bucket-2");
+ assertEquals(result.get(0).getAs("bucket"), "bucket-1");
+ assertEquals(result.get(1).getAs("bucket"), "bucket-2");
verify(pubsubMessagesFetcher).fetchMessages();
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
index 02ab061109e..4db47c76784 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import
org.apache.hudi.utilities.testutils.sources.AbstractCloudObjectsSourceTestBase;
import org.apache.avro.generic.GenericRecord;
@@ -50,6 +51,7 @@ public class TestS3EventsSource extends
AbstractCloudObjectsSourceTestBase {
this.dfsRoot = basePath + "/parquetFiles";
this.fileSuffix = ".parquet";
fs.mkdirs(new Path(dfsRoot));
+ schemaProvider = new
FilebasedSchemaProvider(Helpers.setupSchemaOnDFS("delta-streamer-config",
"s3-metadata.avsc"), jsc);
}
@AfterEach
@@ -100,7 +102,7 @@ public class TestS3EventsSource extends
AbstractCloudObjectsSourceTestBase {
props.setProperty(S3_SOURCE_QUEUE_URL.key(), sqsUrl);
props.setProperty(S3_SOURCE_QUEUE_REGION.key(), regionName);
props.setProperty(S3_SOURCE_QUEUE_FS.key(), "hdfs");
- S3EventsSource dfsSource = new S3EventsSource(props, jsc, sparkSession,
null);
+ S3EventsSource dfsSource = new S3EventsSource(props, jsc, sparkSession,
schemaProvider);
dfsSource.sqs = this.sqs;
return dfsSource;
}
diff --git
a/hudi-utilities/src/test/resources/streamer-config/gcs-metadata.avsc
b/hudi-utilities/src/test/resources/streamer-config/gcs-metadata.avsc
index 79baf5eb80d..3e190932dc1 100644
--- a/hudi-utilities/src/test/resources/streamer-config/gcs-metadata.avsc
+++ b/hudi-utilities/src/test/resources/streamer-config/gcs-metadata.avsc
@@ -22,11 +22,13 @@
"fields": [
{
"name": "_row_key",
- "type": "string"
+ "type": ["null", "string"],
+ "default": null
},
{
"name": "partition_path",
- "type": "string"
+ "type": ["null", "string"],
+ "default": null
},
{
"name": "timestamp",
@@ -34,75 +36,93 @@
},
{
"name": "bucket",
- "type": "string"
+ "type": ["null", "string"],
+ "default": null
},
{
"name": "contentLanguage",
- "type": "string"
+ "type": ["null", "string"],
+ "default": null
},
{
"name": "contentType",
- "type": "string"
+ "type": ["null", "string"],
+ "default": null
},
{
"name": "crc32c",
- "type": "string"
+ "type": ["null", "string"],
+ "default": null
},
{
"name": "etag",
- "type": "string"
+ "type": ["null", "string"],
+ "default": null
},
{
"name": "generation",
- "type": "string"
+ "type": ["null", "string"],
+ "default": null
},
{
"name": "id",
- "type": "string"
+ "type": ["null", "string"],
+ "default": null
},
{
"name": "kind",
- "type": "string"
+ "type": ["null", "string"],
+ "default": null
},
{
"name": "md5Hash",
- "type": "string"
+ "type": ["null", "string"],
+ "default": null
},
{
"name": "mediaLink",
- "type": "string"
+ "type": ["null", "string"],
+ "default": null
},
{
"name": "metageneration",
- "type": "string"
+ "type": ["null", "string"],
+ "default": null
},
{
"name": "name",
- "type": "string"
+ "type": ["null", "string"],
+ "default": null
},
{
"name": "selfLink",
- "type": "string"
+ "type": ["null", "string"],
+ "default": null
},
{
"name": "size",
- "type": "string"
+ "type": ["null", "string"],
+ "default": null
},
{
"name": "storageClass",
- "type": "string"
+ "type": ["null", "string"],
+ "default": null
},
{
"name": "timeCreated",
- "type": "string"
+ "type": ["null", "string"],
+ "default": null
},
{
"name": "timeStorageClassUpdated",
- "type": "string"
+ "type": ["null", "string"],
+ "default": null
},
{
"name": "updated",
- "type": "string"
+ "type": ["null", "string"],
+ "default": null
}
]
}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/streamer-config/s3-metadata.avsc
b/hudi-utilities/src/test/resources/streamer-config/s3-metadata.avsc
new file mode 100644
index 00000000000..64b169c1373
--- /dev/null
+++ b/hudi-utilities/src/test/resources/streamer-config/s3-metadata.avsc
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "type": "record",
+ "name": "hoodie_source",
+ "namespace": "hoodie.source",
+ "fields": [
+ {
+ "name": "awsRegion",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "eventName",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "eventSource",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "eventTime",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "eventVersion",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "requestParameters",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "requestParameters",
+ "namespace": "hoodie.source.hoodie_source",
+ "fields": [
+ {
+ "name": "sourceIPAddress",
+ "type": ["null", "string"],
+ "default": null
+ }
+ ]
+ }
+ ],
+ "default": null
+ },
+ {
+ "name": "s3",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "s3",
+ "namespace": "hoodie.source.hoodie_source",
+ "fields": [
+ {
+ "name": "bucket",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "bucket",
+ "namespace": "hoodie.source.hoodie_source.s3",
+ "fields": [
+ {
+ "name": "arn",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "name",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "ownerIdentity",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "ownerIdentity",
+ "namespace": "hoodie.source.hoodie_source.s3.bucket",
+ "fields": [
+ {
+ "name": "principalId",
+ "type": ["null", "string"],
+ "default": null
+ }
+ ]
+ }
+ ],
+ "default": null
+ }
+ ]
+ }
+ ],
+ "default": null
+ },
+ {
+ "name": "configurationId",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "object",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "object",
+ "namespace": "hoodie.source.hoodie_source.s3",
+ "fields": [
+ {
+ "name": "eTag",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "key",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "sequencer",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "size",
+ "type": ["null", "long"],
+ "default": null
+ }
+ ]
+ }
+ ],
+ "default": null
+ },
+ {
+ "name": "s3SchemaVersion",
+ "type": ["null", "string"],
+ "default": null
+ }
+ ]
+ }
+ ],
+ "default": null
+ },
+ {
+ "name": "userIdentity",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "userIdentity",
+ "namespace": "hoodie.source.hoodie_source",
+ "fields": [
+ {
+ "name": "principalId",
+ "type": ["null", "string"],
+ "default": null
+ }
+ ]
+ }
+ ],
+ "default": null
+ }
+ ]
+}
\ No newline at end of file