This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 473cf9a8284 [HUDI-7138] Fix error table writer and schema registry
provider (#10173)
473cf9a8284 is described below
commit 473cf9a8284fa9e59d1509ae1d5b1a43afc7ea7b
Author: Rajesh Mahindra <[email protected]>
AuthorDate: Wed Nov 29 08:37:40 2023 -0800
[HUDI-7138] Fix error table writer and schema registry provider (#10173)
---------
Co-authored-by: rmahindra123 <[email protected]>
---
.../main/scala/org/apache/hudi/HoodieConversionUtils.scala | 7 ++-----
.../main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 2 +-
.../hudi/utilities/sources/S3EventsHoodieIncrSource.java | 3 +--
.../hudi/utilities/streamer/BaseErrorTableWriter.java | 4 +++-
.../hudi/utilities/schema/TestSchemaRegistryProvider.java | 14 +++++++++++---
5 files changed, 18 insertions(+), 12 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
index 62a315b85a0..23efce82984 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
@@ -21,8 +21,7 @@ package org.apache.hudi
import org.apache.hudi.common.config.TypedProperties
import java.{util => ju}
-import scala.collection.JavaConverters
-import scala.jdk.CollectionConverters.dictionaryAsScalaMapConverter
+import scala.collection.JavaConverters._
object HoodieConversionUtils {
@@ -49,9 +48,7 @@ object HoodieConversionUtils {
}
def fromProperties(props: TypedProperties): Map[String, String] = {
- props.asScala.map {
- case (k, v) => (k.toString, v.toString)
- }.toMap
+ props.asScala.toMap
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 7f28e190e95..b8dbb18287e 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -148,7 +148,7 @@ object HoodieSparkSqlWriter {
latestTableSchemaOpt: Option[Schema],
internalSchemaOpt: Option[InternalSchema],
props: TypedProperties): Schema = {
- deduceWriterSchema(sourceSchema, latestTableSchemaOpt, internalSchemaOpt,
props.toMap)
+ deduceWriterSchema(sourceSchema, latestTableSchemaOpt, internalSchemaOpt,
HoodieConversionUtils.fromProperties(props))
}
def cleanup(): Unit = {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index 61ed02da106..3af87d49489 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -35,7 +35,6 @@ import
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
-import org.apache.parquet.Strings;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@@ -141,7 +140,7 @@ public class S3EventsHoodieIncrSource extends
HoodieIncrSource {
// This is to ensure backward compatibility where we were using the
// config SOURCE_FILE_FORMAT for file format in previous versions.
- this.fileFormat = Strings.isNullOrEmpty(getStringWithAltKeys(props,
DATAFILE_FORMAT, EMPTY_STRING))
+ this.fileFormat = StringUtils.isNullOrEmpty(getStringWithAltKeys(props,
DATAFILE_FORMAT, EMPTY_STRING))
? getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true)
: getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java
index e22942763a8..77a85831518 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;
+import java.io.Serializable;
+
/**
* The class which handles error events while processing write records. All the
* records which have a processing/write failure are triggered as error events
to
@@ -38,7 +40,7 @@ import org.apache.spark.sql.SparkSession;
*
* The writer can use the configs defined in HoodieErrorTableConfig to manage
the error table.
*/
-public abstract class BaseErrorTableWriter<T extends ErrorEvent> {
+public abstract class BaseErrorTableWriter<T extends ErrorEvent> implements
Serializable {
// The column name passed to Spark for option `columnNameOfCorruptRecord`.
The record
// is set to this column in case of an error
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
index 80f40033a3e..abbe983cbce 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
@@ -46,11 +46,18 @@ class TestSchemaRegistryProvider {
private static final String REGISTRY_RESPONSE =
"{\"schema\":\"{\\\"type\\\": \\\"record\\\", \\\"namespace\\\":
\\\"example\\\", "
+ "\\\"name\\\": \\\"FullName\\\",\\\"fields\\\": [{ \\\"name\\\":
\\\"first\\\", \\\"type\\\": "
+ "\\\"string\\\" }]}\"}";
+ private static final String RAW_SCHEMA = "{\"type\": \"record\",
\"namespace\": \"example\", "
+ + "\"name\": \"FullName\",\"fields\": [{ \"name\": \"first\", \"type\": "
+ + "\"string\" }]}";
private static final String CONVERTED_SCHEMA = "{\"type\": \"record\",
\"namespace\": \"com.example.hoodie\", "
+ "\"name\": \"FullName\",\"fields\": [{ \"name\": \"first\", \"type\": "
+ "\"string\" }]}";
private static Schema getExpectedSchema() {
+ return new Schema.Parser().parse(RAW_SCHEMA);
+ }
+
+ private static Schema getExpectedConvertedSchema() {
return new Schema.Parser().parse(CONVERTED_SCHEMA);
}
@@ -60,7 +67,6 @@ class TestSchemaRegistryProvider {
put("hoodie.deltastreamer.schemaprovider.registry.baseUrl", "http://"
+ BASIC_AUTH + "@localhost");
put("hoodie.deltastreamer.schemaprovider.registry.urlSuffix",
"-value");
put("hoodie.deltastreamer.schemaprovider.registry.url",
"http://foo:bar@localhost");
- put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter",
DummySchemaConverter.class.getName());
put("hoodie.deltastreamer.source.kafka.topic", "foo");
}
};
@@ -97,10 +103,11 @@ class TestSchemaRegistryProvider {
public void testGetSourceSchemaShouldRequestSchemaWithoutCreds() throws
IOException {
TypedProperties props = getProps();
props.put("hoodie.deltastreamer.schemaprovider.registry.url",
"http://localhost");
+ props.put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter",
DummySchemaConverter.class.getName());
SchemaRegistryProvider spyUnderTest = getUnderTest(props);
Schema actual = spyUnderTest.getSourceSchema();
assertNotNull(actual);
- assertEquals(getExpectedSchema(), actual);
+ assertEquals(getExpectedConvertedSchema(), actual);
verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(),
Mockito.any());
}
@@ -108,10 +115,11 @@ class TestSchemaRegistryProvider {
public void testGetTargetSchemaShouldRequestSchemaWithoutCreds() throws
IOException {
TypedProperties props = getProps();
props.put("hoodie.deltastreamer.schemaprovider.registry.url",
"http://localhost");
+ props.put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter",
DummySchemaConverter.class.getName());
SchemaRegistryProvider spyUnderTest = getUnderTest(props);
Schema actual = spyUnderTest.getTargetSchema();
assertNotNull(actual);
- assertEquals(getExpectedSchema(), actual);
+ assertEquals(getExpectedConvertedSchema(), actual);
verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(),
Mockito.any());
}