This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6409fc7  [HUDI-2374] Fixing AvroDFSSource does not use the overridden 
schema to deserialize Avro binaries (#4353)
6409fc7 is described below

commit 6409fc733d27eea6ebc71551affdf5aec4bb2fd5
Author: harshal <[email protected]>
AuthorDate: Tue Dec 28 09:31:21 2021 +0530

    [HUDI-2374] Fixing AvroDFSSource does not use the overridden schema to 
deserialize Avro binaries (#4353)
---
 .../hudi/utilities/sources/AvroDFSSource.java      |  1 +
 .../hudi/utilities/sources/TestAvroDFSSource.java  | 56 ++++++++++++++++++++++
 .../utilities/testutils/UtilitiesTestBase.java     | 17 +++++++
 3 files changed, 74 insertions(+)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
index aed6c6b..4e80009 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
@@ -45,6 +45,7 @@ public class AvroDFSSource extends AvroSource {
   public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext, 
SparkSession sparkSession,
       SchemaProvider schemaProvider) throws IOException {
     super(props, sparkContext, sparkSession, schemaProvider);
+    sparkContext.hadoopConfiguration().set("avro.schema.input.key", 
schemaProvider.getSourceSchema().toString());
     this.pathSelector = DFSPathSelector
         .createSourceSelector(props, sparkContext.hadoopConfiguration());
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
new file mode 100644
index 0000000..37abaa5
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.BeforeEach;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Basic tests for {@link TestAvroDFSSource}.
+ */
+public class TestAvroDFSSource extends AbstractDFSSourceTestBase {
+
+  @BeforeEach
+  public void setup() throws Exception {
+    super.setup();
+    this.dfsRoot = dfsBasePath + "/avroFiles";
+    this.fileSuffix = ".avro";
+  }
+
+  @Override
+  protected Source prepareDFSSource() {
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot);
+    try {
+      return new AvroDFSSource(props, jsc, sparkSession, schemaProvider);
+    } catch (IOException e) {
+      return null;
+    }
+  }
+
+  @Override
+  protected void writeNewDataToFile(List<HoodieRecord> records, Path path) 
throws IOException {
+    Helpers.saveAvroToDFS(Helpers.toGenericRecords(records), path);
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index bb00d2f..90a3f5a 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -48,6 +48,8 @@ import com.fasterxml.jackson.dataformat.csv.CsvMapper;
 import com.fasterxml.jackson.dataformat.csv.CsvSchema;
 import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
 import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FileSystem;
@@ -78,6 +80,7 @@ import java.io.BufferedReader;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStream;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -341,6 +344,20 @@ public class UtilitiesTestBase {
       }
     }
 
+    public static void saveAvroToDFS(List<GenericRecord> records, Path 
targetFile) throws IOException {
+      saveAvroToDFS(records,targetFile,HoodieTestDataGenerator.AVRO_SCHEMA);
+    }
+
+    public static void saveAvroToDFS(List<GenericRecord> records, Path 
targetFile, Schema schema) throws IOException {
+      FileSystem fs = 
targetFile.getFileSystem(HoodieTestUtils.getDefaultHadoopConf());
+      OutputStream output = fs.create(targetFile);
+      try (DataFileWriter<IndexedRecord> dataFileWriter = new 
DataFileWriter<>(new GenericDatumWriter(schema)).create(schema, output)) {
+        for (GenericRecord record : records) {
+          dataFileWriter.append(record);
+        }
+      }
+    }
+
     public static TypedProperties setupSchemaOnDFS() throws IOException {
       return setupSchemaOnDFS("delta-streamer-config", "source.avsc");
     }

Reply via email to