This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new ace55a93e58 [HUDI-7785] Keep public APIs in utilities module the same
as before HoodieStorage abstraction (#11280)
ace55a93e58 is described below
commit ace55a93e587579ee53206b66498feec4dcc4d92
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu May 23 23:25:27 2024 -0700
[HUDI-7785] Keep public APIs in utilities module the same as before
HoodieStorage abstraction (#11280)
---
.../hudi/utilities/deltastreamer/DeltaSync.java | 3 +-
.../deltastreamer/HoodieDeltaStreamer.java | 5 +-
.../utilities/streamer/BaseErrorTableWriter.java | 13 ++--
.../hudi/utilities/streamer/ErrorTableUtils.java | 5 +-
.../hudi/utilities/streamer/HoodieStreamer.java | 41 ++++++-------
.../apache/hudi/utilities/streamer/StreamSync.java | 8 +--
...TestHoodieDeltaStreamerSchemaEvolutionBase.java | 6 +-
.../utilities/sources/TestJsonKafkaSource.java | 2 +-
.../utilities/streamer/TestErrorTableUtils.java | 70 ++++++++++++++++++++++
9 files changed, 111 insertions(+), 42 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index c8a1b47b9fb..24fa9b97051 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -23,7 +23,6 @@ import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
@@ -53,6 +52,6 @@ public class DeltaSync extends StreamSync {
TypedProperties props, HoodieSparkEngineContext
hoodieSparkContext, FileSystem fs, Configuration conf,
Function<SparkRDDWriteClient, Boolean>
onInitializingHoodieWriteClient) throws IOException {
super(cfg, sparkSession, props, hoodieSparkContext,
- new HoodieHadoopStorage(fs), conf, onInitializingHoodieWriteClient,
new DefaultStreamContext(schemaProvider, Option.empty()));
+ fs, conf, onInitializingHoodieWriteClient, new
DefaultStreamContext(schemaProvider, Option.empty()));
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 6c5cca9888e..8d941886a08 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -21,7 +21,6 @@ package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.hadoop.conf.Configuration;
@@ -51,7 +50,7 @@ public class HoodieDeltaStreamer extends HoodieStreamer {
JavaSparkContext jssc,
FileSystem fs,
Configuration conf) throws IOException {
- super(cfg, jssc, new HoodieHadoopStorage(fs), conf);
+ super(cfg, jssc, fs, conf);
}
public HoodieDeltaStreamer(Config cfg,
@@ -59,7 +58,7 @@ public class HoodieDeltaStreamer extends HoodieStreamer {
FileSystem fs,
Configuration conf,
Option<TypedProperties> propsOverride) throws
IOException {
- super(cfg, jssc, new HoodieHadoopStorage(fs), conf, propsOverride);
+ super(cfg, jssc, fs, conf, propsOverride);
}
@Deprecated
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java
index b9d18dbd916..5c25d68d2c4 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java
@@ -19,13 +19,16 @@
package org.apache.hudi.utilities.streamer;
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.VisibleForTesting;
-import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;
@@ -40,6 +43,7 @@ import java.io.Serializable;
*
* The writer can use the configs defined in HoodieErrorTableConfig to manage
the error table.
*/
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
public abstract class BaseErrorTableWriter<T extends ErrorEvent> implements
Serializable {
// The column name passed to Spark for option `columnNameOfCorruptRecord`.
The record
@@ -47,8 +51,7 @@ public abstract class BaseErrorTableWriter<T extends
ErrorEvent> implements Seri
public static String ERROR_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record";
public BaseErrorTableWriter(HoodieStreamer.Config cfg, SparkSession
sparkSession,
- TypedProperties props,
- HoodieSparkEngineContext hoodieSparkContext,
HoodieStorage storage) {
+ TypedProperties props, HoodieSparkEngineContext
hoodieSparkContext, FileSystem fileSystem) {
}
/**
@@ -57,18 +60,20 @@ public abstract class BaseErrorTableWriter<T extends
ErrorEvent> implements Seri
*
* @param errorEvent Input error event RDD
*/
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public abstract void addErrorEvents(JavaRDD<T> errorEvent);
/**
* Fetches the error events RDD processed by the writer so far. This is a
test API.
*/
@VisibleForTesting
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public abstract Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String
baseTableInstantTime, Option<String> commitedInstantTime);
/**
* This API is called to commit the error events (failed Hoodie Records)
processed by the writer so far.
* These records are committed to a error table.
*/
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public abstract boolean upsertAndCommit(String baseTableInstantTime,
Option<String> commitedInstantTime);
-
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java
index fce14d18807..9d9de91fbba 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieValidationException;
-import org.apache.hudi.storage.HoodieStorage;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.sql.Dataset;
@@ -48,7 +47,7 @@ public final class ErrorTableUtils {
SparkSession
sparkSession,
TypedProperties props,
HoodieSparkEngineContext hoodieSparkContext,
- HoodieStorage
storage) {
+ FileSystem
fileSystem) {
String errorTableWriterClass =
props.getString(ERROR_TABLE_WRITE_CLASS.key());
ValidationUtils.checkState(!StringUtils.isNullOrEmpty(errorTableWriterClass),
"Missing error table config " + ERROR_TABLE_WRITE_CLASS);
@@ -65,7 +64,7 @@ public final class ErrorTableUtils {
try {
return Option.of((BaseErrorTableWriter)
ReflectionUtils.getClass(errorTableWriterClass)
.getConstructor(argClassArr)
- .newInstance(cfg, sparkSession, props, hoodieSparkContext, storage));
+ .newInstance(cfg, sparkSession, props, hoodieSparkContext,
fileSystem));
} catch (NoSuchMethodException | InvocationTargetException |
InstantiationException
| IllegalAccessException e) {
throw new HoodieException(errMsg, e);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 5af958d108b..4ea84ff7a5e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -58,8 +58,8 @@ import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.utilities.HiveIncrementalPuller;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
@@ -132,26 +132,24 @@ public class HoodieStreamer implements Serializable {
public static final String STREAMSYNC_POOL_NAME = "hoodiedeltasync";
public HoodieStreamer(Config cfg, JavaSparkContext jssc) throws IOException {
- this(cfg, jssc,
- HoodieStorageUtils.getStorage(cfg.targetBasePath,
HadoopFSUtils.getStorageConf(jssc.hadoopConfiguration())),
+ this(cfg, jssc, HadoopFSUtils.getFs(cfg.targetBasePath,
jssc.hadoopConfiguration()),
jssc.hadoopConfiguration(), Option.empty());
}
public HoodieStreamer(Config cfg, JavaSparkContext jssc,
Option<TypedProperties> props) throws IOException {
- this(cfg, jssc,
- HoodieStorageUtils.getStorage(cfg.targetBasePath,
HadoopFSUtils.getStorageConf(jssc.hadoopConfiguration())),
+ this(cfg, jssc, HadoopFSUtils.getFs(cfg.targetBasePath,
jssc.hadoopConfiguration()),
jssc.hadoopConfiguration(), props);
}
- public HoodieStreamer(Config cfg, JavaSparkContext jssc, HoodieStorage
storage, Configuration conf) throws IOException {
- this(cfg, jssc, storage, conf, Option.empty());
+ public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs,
Configuration conf) throws IOException {
+ this(cfg, jssc, fs, conf, Option.empty());
}
- public HoodieStreamer(Config cfg, JavaSparkContext jssc, HoodieStorage
storage, Configuration conf, Option<TypedProperties> propsOverride) throws
IOException {
- this(cfg, jssc, storage, conf, propsOverride, Option.empty());
+ public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs,
Configuration conf, Option<TypedProperties> propsOverride) throws IOException {
+ this(cfg, jssc, fs, conf, propsOverride, Option.empty());
}
- public HoodieStreamer(Config cfg, JavaSparkContext jssc, HoodieStorage
storage, Configuration conf,
+ public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs,
Configuration conf,
Option<TypedProperties> propsOverride,
Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException {
this.properties = combineProperties(cfg, propsOverride,
jssc.hadoopConfiguration());
if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) {
@@ -163,11 +161,10 @@ public class HoodieStreamer implements Serializable {
this.cfg = cfg;
this.bootstrapExecutor = Option.ofNullable(
- cfg.runBootstrap ? new BootstrapExecutor(
- cfg, jssc, (FileSystem) storage.getFileSystem(), conf,
this.properties) : null);
+ cfg.runBootstrap ? new BootstrapExecutor(cfg, jssc, fs, conf,
this.properties) : null);
HoodieSparkEngineContext sparkEngineContext = new
HoodieSparkEngineContext(jssc);
this.ingestionService = Option.ofNullable(
- cfg.runBootstrap ? null : new StreamSyncService(cfg,
sparkEngineContext, storage, conf, Option.ofNullable(this.properties),
sourceProfileSupplier));
+ cfg.runBootstrap ? null : new StreamSyncService(cfg,
sparkEngineContext, fs, conf, Option.ofNullable(this.properties),
sourceProfileSupplier));
}
private static TypedProperties combineProperties(Config cfg,
Option<TypedProperties> propsOverride, Configuration hadoopConf) {
@@ -672,14 +669,14 @@ public class HoodieStreamer implements Serializable {
private final Option<ConfigurationHotUpdateStrategy>
configurationHotUpdateStrategyOpt;
public StreamSyncService(Config cfg, HoodieSparkEngineContext
hoodieSparkContext,
- HoodieStorage storage, Configuration conf,
+ FileSystem fs, Configuration conf,
Option<TypedProperties> properties,
Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException {
super(HoodieIngestionConfig.newBuilder()
.isContinuous(cfg.continuousMode)
.withMinSyncInternalSeconds(cfg.minSyncIntervalSeconds).build());
this.cfg = cfg;
this.hoodieSparkContext = hoodieSparkContext;
- this.storage = storage;
+ this.storage = new HoodieHadoopStorage(fs);
this.hiveConf = conf;
this.sparkSession =
SparkSession.builder().config(hoodieSparkContext.getConf()).getOrCreate();
this.asyncCompactService = Option.empty();
@@ -732,20 +729,19 @@ public class HoodieStreamer implements Serializable {
props, hoodieSparkContext.jsc(), cfg.transformerClassNames);
streamSync = new StreamSync(cfg, sparkSession, props, hoodieSparkContext,
- this.storage, conf, this::onInitializingWriteClient, new
DefaultStreamContext(schemaProvider, sourceProfileSupplier));
-
+ fs, conf, this::onInitializingWriteClient, new
DefaultStreamContext(schemaProvider, sourceProfileSupplier));
}
public StreamSyncService(HoodieStreamer.Config cfg,
- HoodieSparkEngineContext hoodieSparkContext,
HoodieStorage storage,
+ HoodieSparkEngineContext hoodieSparkContext,
FileSystem fs,
Configuration conf)
throws IOException {
- this(cfg, hoodieSparkContext, storage, conf, Option.empty(),
Option.empty());
+ this(cfg, hoodieSparkContext, fs, conf, Option.empty(), Option.empty());
}
- public StreamSyncService(HoodieStreamer.Config cfg,
HoodieSparkEngineContext hoodieSparkContext, HoodieStorage storage,
Configuration conf, Option<TypedProperties> properties)
+ public StreamSyncService(HoodieStreamer.Config cfg,
HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf,
Option<TypedProperties> properties)
throws IOException {
- this(cfg, hoodieSparkContext, storage, conf, properties, Option.empty());
+ this(cfg, hoodieSparkContext, fs, conf, properties, Option.empty());
}
private void initializeTableTypeAndBaseFileFormat() {
@@ -760,7 +756,8 @@ public class HoodieStreamer implements Serializable {
streamSync.close();
}
streamSync = new StreamSync(cfg, sparkSession, props, hoodieSparkContext,
- storage, hiveConf, this::onInitializingWriteClient, new
DefaultStreamContext(schemaProvider, Option.empty()));
+ (FileSystem) storage.getFileSystem(), hiveConf,
this::onInitializingWriteClient,
+ new DefaultStreamContext(schemaProvider, Option.empty()));
}
@Override
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 20e530c2ee7..d5405645782 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -292,18 +292,18 @@ public class StreamSync implements Serializable,
Closeable {
TypedProperties props, JavaSparkContext jssc, FileSystem
fs, Configuration conf,
Function<SparkRDDWriteClient, Boolean>
onInitializingHoodieWriteClient) throws IOException {
this(cfg, sparkSession, props, new HoodieSparkEngineContext(jssc),
- new HoodieHadoopStorage(fs), conf, onInitializingHoodieWriteClient,
+ fs, conf, onInitializingHoodieWriteClient,
new DefaultStreamContext(schemaProvider, Option.empty()));
}
public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession,
TypedProperties props, HoodieSparkEngineContext
hoodieSparkContext,
- HoodieStorage storage, Configuration conf,
+ FileSystem fs, Configuration conf,
Function<SparkRDDWriteClient, Boolean>
onInitializingHoodieWriteClient, StreamContext streamContext) throws
IOException {
this.cfg = cfg;
this.hoodieSparkContext = hoodieSparkContext;
this.sparkSession = sparkSession;
- this.storage = storage;
+ this.storage = new HoodieHadoopStorage(fs);
this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
this.props = props;
this.userProvidedSchemaProvider = streamContext.getSchemaProvider();
@@ -319,7 +319,7 @@ public class StreamSync implements Serializable, Closeable {
this.hoodieMetrics = new HoodieMetrics(hoodieWriteConfig,
storage.getConf());
if (props.getBoolean(ERROR_TABLE_ENABLED.key(),
ERROR_TABLE_ENABLED.defaultValue())) {
this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(
- cfg, sparkSession, props, hoodieSparkContext, storage);
+ cfg, sparkSession, props, hoodieSparkContext, fs);
this.errorWriteFailureStrategy =
ErrorTableUtils.getErrorWriteFailureStrategy(props);
}
refreshTimeline();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
index c6f2afc2ef7..e4670799830 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
@@ -32,7 +32,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieErrorTableConfig;
-import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.AvroKafkaSource;
@@ -42,6 +41,7 @@ import org.apache.hudi.utilities.streamer.HoodieStreamer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -327,8 +327,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase
extends HoodieDeltaStrea
public static Map<String,Option<JavaRDD>> commited = new HashMap<>();
public TestErrorTable(HoodieStreamer.Config cfg, SparkSession
sparkSession, TypedProperties props, HoodieSparkEngineContext
hoodieSparkContext,
- HoodieStorage storage) {
- super(cfg, sparkSession, props, hoodieSparkContext, storage);
+ FileSystem fileSystem) {
+ super(cfg, sparkSession, props, hoodieSparkContext, fileSystem);
}
@Override
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index 92238721fcd..90d8543a5db 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -296,7 +296,7 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
private BaseErrorTableWriter getAnonymousErrorTableWriter(TypedProperties
props) {
return new BaseErrorTableWriter<ErrorEvent<String>>(new
HoodieDeltaStreamer.Config(),
- spark(), props, new HoodieSparkEngineContext(jsc()), hoodieStorage()) {
+ spark(), props, new HoodieSparkEngineContext(jsc()), fs()) {
List<JavaRDD<HoodieAvroRecord>> errorEvents = new LinkedList();
@Override
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestErrorTableUtils.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestErrorTableUtils.java
new file mode 100644
index 00000000000..1a3af25cd59
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestErrorTableUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+import
org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamerSchemaEvolutionBase.TestErrorTable;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests {@link ErrorTableUtils}.
+ */
+public class TestErrorTableUtils {
+ @Test
+ public void testGetErrorTableWriter() {
+ SparkSession sparkSession = Mockito.mock(SparkSession.class);
+ HoodieSparkEngineContext sparkContext =
Mockito.mock(HoodieSparkEngineContext.class);
+ FileSystem fileSystem = Mockito.mock(FileSystem.class);
+
+ TypedProperties props = new TypedProperties();
+ // No error table writer config
+ assertThrows(IllegalArgumentException.class,
+ () -> ErrorTableUtils.getErrorTableWriter(
+ new HoodieStreamer.Config(), sparkSession, props, sparkContext,
fileSystem));
+
+ // Empty error table writer config
+ props.put("hoodie.errortable.write.class", StringUtils.EMPTY_STRING);
+ assertThrows(IllegalStateException.class,
+ () -> ErrorTableUtils.getErrorTableWriter(
+ new HoodieStreamer.Config(), sparkSession, props, sparkContext,
fileSystem));
+
+ // Proper error table writer config
+ props.put("hoodie.errortable.write.class", TestErrorTable.class.getName());
+ assertTrue(ErrorTableUtils.getErrorTableWriter(
+ new HoodieStreamer.Config(), sparkSession, props, sparkContext,
fileSystem).get() instanceof TestErrorTable);
+
+ // Wrong error table writer config
+ props.put("hoodie.errortable.write.class", HoodieConfig.class.getName());
+ assertThrows(HoodieException.class,
+ () -> ErrorTableUtils.getErrorTableWriter(
+ new HoodieStreamer.Config(), sparkSession, props, sparkContext,
fileSystem));
+ }
+}