This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6409fc7 [HUDI-2374] Fixing AvroDFSSource does not use the overridden
schema to deserialize Avro binaries (#4353)
6409fc7 is described below
commit 6409fc733d27eea6ebc71551affdf5aec4bb2fd5
Author: harshal <[email protected]>
AuthorDate: Tue Dec 28 09:31:21 2021 +0530
[HUDI-2374] Fixing AvroDFSSource does not use the overridden schema to
deserialize Avro binaries (#4353)
---
.../hudi/utilities/sources/AvroDFSSource.java | 1 +
.../hudi/utilities/sources/TestAvroDFSSource.java | 56 ++++++++++++++++++++++
.../utilities/testutils/UtilitiesTestBase.java | 17 +++++++
3 files changed, 74 insertions(+)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
index aed6c6b..4e80009 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
@@ -45,6 +45,7 @@ public class AvroDFSSource extends AvroSource {
public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext,
SparkSession sparkSession,
SchemaProvider schemaProvider) throws IOException {
super(props, sparkContext, sparkSession, schemaProvider);
+ sparkContext.hadoopConfiguration().set("avro.schema.input.key",
schemaProvider.getSourceSchema().toString());
this.pathSelector = DFSPathSelector
.createSourceSelector(props, sparkContext.hadoopConfiguration());
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
new file mode 100644
index 0000000..37abaa5
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.BeforeEach;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Basic tests for {@link TestAvroDFSSource}.
+ */
+public class TestAvroDFSSource extends AbstractDFSSourceTestBase {
+
+ @BeforeEach
+ public void setup() throws Exception {
+ super.setup();
+ this.dfsRoot = dfsBasePath + "/avroFiles";
+ this.fileSuffix = ".avro";
+ }
+
+ @Override
+ protected Source prepareDFSSource() {
+ TypedProperties props = new TypedProperties();
+ props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot);
+ try {
+ return new AvroDFSSource(props, jsc, sparkSession, schemaProvider);
+ } catch (IOException e) {
+ return null;
+ }
+ }
+
+ @Override
+ protected void writeNewDataToFile(List<HoodieRecord> records, Path path)
throws IOException {
+ Helpers.saveAvroToDFS(Helpers.toGenericRecords(records), path);
+ }
+}
\ No newline at end of file
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index bb00d2f..90a3f5a 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -48,6 +48,8 @@ import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
@@ -78,6 +80,7 @@ import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.OutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
@@ -341,6 +344,20 @@ public class UtilitiesTestBase {
}
}
+ public static void saveAvroToDFS(List<GenericRecord> records, Path
targetFile) throws IOException {
+ saveAvroToDFS(records,targetFile,HoodieTestDataGenerator.AVRO_SCHEMA);
+ }
+
+ public static void saveAvroToDFS(List<GenericRecord> records, Path
targetFile, Schema schema) throws IOException {
+ FileSystem fs =
targetFile.getFileSystem(HoodieTestUtils.getDefaultHadoopConf());
+ OutputStream output = fs.create(targetFile);
+ try (DataFileWriter<IndexedRecord> dataFileWriter = new
DataFileWriter<>(new GenericDatumWriter(schema)).create(schema, output)) {
+ for (GenericRecord record : records) {
+ dataFileWriter.append(record);
+ }
+ }
+ }
+
public static TypedProperties setupSchemaOnDFS() throws IOException {
return setupSchemaOnDFS("delta-streamer-config", "source.avsc");
}