This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 473cf9a8284 [HUDI-7138] Fix error table writer and schema registry 
provider (#10173)
473cf9a8284 is described below

commit 473cf9a8284fa9e59d1509ae1d5b1a43afc7ea7b
Author: Rajesh Mahindra <[email protected]>
AuthorDate: Wed Nov 29 08:37:40 2023 -0800

    [HUDI-7138] Fix error table writer and schema registry provider (#10173)
    
    
    ---------
    
    Co-authored-by: rmahindra123 <[email protected]>
---
 .../main/scala/org/apache/hudi/HoodieConversionUtils.scala |  7 ++-----
 .../main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala  |  2 +-
 .../hudi/utilities/sources/S3EventsHoodieIncrSource.java   |  3 +--
 .../hudi/utilities/streamer/BaseErrorTableWriter.java      |  4 +++-
 .../hudi/utilities/schema/TestSchemaRegistryProvider.java  | 14 +++++++++++---
 5 files changed, 18 insertions(+), 12 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
index 62a315b85a0..23efce82984 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
@@ -21,8 +21,7 @@ package org.apache.hudi
 import org.apache.hudi.common.config.TypedProperties
 
 import java.{util => ju}
-import scala.collection.JavaConverters
-import scala.jdk.CollectionConverters.dictionaryAsScalaMapConverter
+import scala.collection.JavaConverters._
 
 object HoodieConversionUtils {
 
@@ -49,9 +48,7 @@ object HoodieConversionUtils {
   }
 
   def fromProperties(props: TypedProperties): Map[String, String] = {
-    props.asScala.map {
-      case (k, v) => (k.toString, v.toString)
-    }.toMap
+    props.asScala.toMap
   }
 
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 7f28e190e95..b8dbb18287e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -148,7 +148,7 @@ object HoodieSparkSqlWriter {
                          latestTableSchemaOpt: Option[Schema],
                          internalSchemaOpt: Option[InternalSchema],
                          props: TypedProperties): Schema = {
-    deduceWriterSchema(sourceSchema, latestTableSchemaOpt, internalSchemaOpt, 
props.toMap)
+    deduceWriterSchema(sourceSchema, latestTableSchemaOpt, internalSchemaOpt, 
HoodieConversionUtils.fromProperties(props))
   }
 
   def cleanup(): Unit = {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index 61ed02da106..3af87d49489 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -35,7 +35,6 @@ import 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
 import org.apache.hudi.utilities.sources.helpers.QueryInfo;
 import org.apache.hudi.utilities.sources.helpers.QueryRunner;
 
-import org.apache.parquet.Strings;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
@@ -141,7 +140,7 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
 
     // This is to ensure backward compatibility where we were using the
     // config SOURCE_FILE_FORMAT for file format in previous versions.
-    this.fileFormat = Strings.isNullOrEmpty(getStringWithAltKeys(props, 
DATAFILE_FORMAT, EMPTY_STRING))
+    this.fileFormat = StringUtils.isNullOrEmpty(getStringWithAltKeys(props, 
DATAFILE_FORMAT, EMPTY_STRING))
         ? getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true)
         : getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING);
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java
index e22942763a8..77a85831518 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.SparkSession;
 
+import java.io.Serializable;
+
 /**
  * The class which handles error events while processing write records. All the
  * records which have a processing/write failure are triggered as error events 
to
@@ -38,7 +40,7 @@ import org.apache.spark.sql.SparkSession;
  *
  * The writer can use the configs defined in HoodieErrorTableConfig to manage 
the error table.
  */
-public abstract class BaseErrorTableWriter<T extends ErrorEvent> {
+public abstract class BaseErrorTableWriter<T extends ErrorEvent> implements 
Serializable {
 
   // The column name passed to Spark for option `columnNameOfCorruptRecord`. 
The record
   // is set to this column in case of an error
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
index 80f40033a3e..abbe983cbce 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
@@ -46,11 +46,18 @@ class TestSchemaRegistryProvider {
   private static final String REGISTRY_RESPONSE = 
"{\"schema\":\"{\\\"type\\\": \\\"record\\\", \\\"namespace\\\": 
\\\"example\\\", "
       + "\\\"name\\\": \\\"FullName\\\",\\\"fields\\\": [{ \\\"name\\\": 
\\\"first\\\", \\\"type\\\": "
       + "\\\"string\\\" }]}\"}";
+  private static final String RAW_SCHEMA = "{\"type\": \"record\", 
\"namespace\": \"example\", "
+      + "\"name\": \"FullName\",\"fields\": [{ \"name\": \"first\", \"type\": "
+      + "\"string\" }]}";
   private static final String CONVERTED_SCHEMA = "{\"type\": \"record\", 
\"namespace\": \"com.example.hoodie\", "
       + "\"name\": \"FullName\",\"fields\": [{ \"name\": \"first\", \"type\": "
       + "\"string\" }]}";
 
   private static Schema getExpectedSchema() {
+    return new Schema.Parser().parse(RAW_SCHEMA);
+  }
+
+  private static Schema getExpectedConvertedSchema() {
     return new Schema.Parser().parse(CONVERTED_SCHEMA);
   }
 
@@ -60,7 +67,6 @@ class TestSchemaRegistryProvider {
         put("hoodie.deltastreamer.schemaprovider.registry.baseUrl", "http://"; 
+ BASIC_AUTH + "@localhost");
         put("hoodie.deltastreamer.schemaprovider.registry.urlSuffix", 
"-value");
         put("hoodie.deltastreamer.schemaprovider.registry.url", 
"http://foo:bar@localhost";);
-        put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter", 
DummySchemaConverter.class.getName());
         put("hoodie.deltastreamer.source.kafka.topic", "foo");
       }
     };
@@ -97,10 +103,11 @@ class TestSchemaRegistryProvider {
   public void testGetSourceSchemaShouldRequestSchemaWithoutCreds() throws 
IOException {
     TypedProperties props = getProps();
     props.put("hoodie.deltastreamer.schemaprovider.registry.url", 
"http://localhost";);
+    props.put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter", 
DummySchemaConverter.class.getName());
     SchemaRegistryProvider spyUnderTest = getUnderTest(props);
     Schema actual = spyUnderTest.getSourceSchema();
     assertNotNull(actual);
-    assertEquals(getExpectedSchema(), actual);
+    assertEquals(getExpectedConvertedSchema(), actual);
     verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(), 
Mockito.any());
   }
 
@@ -108,10 +115,11 @@ class TestSchemaRegistryProvider {
   public void testGetTargetSchemaShouldRequestSchemaWithoutCreds() throws 
IOException {
     TypedProperties props = getProps();
     props.put("hoodie.deltastreamer.schemaprovider.registry.url", 
"http://localhost";);
+    props.put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter", 
DummySchemaConverter.class.getName());
     SchemaRegistryProvider spyUnderTest = getUnderTest(props);
     Schema actual = spyUnderTest.getTargetSchema();
     assertNotNull(actual);
-    assertEquals(getExpectedSchema(), actual);
+    assertEquals(getExpectedConvertedSchema(), actual);
     verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(), 
Mockito.any());
   }
 

Reply via email to