This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.15.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b71da938ac5ce7913965a2e7dfb865cc40e901b5
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu May 23 23:25:27 2024 -0700

    [HUDI-7785] Keep public APIs in utilities module the same as before 
HoodieStorage abstraction (#11280)
---
 .../hudi/utilities/deltastreamer/DeltaSync.java    |  3 +-
 .../deltastreamer/HoodieDeltaStreamer.java         |  5 +-
 .../utilities/streamer/BaseErrorTableWriter.java   | 13 ++--
 .../hudi/utilities/streamer/ErrorTableUtils.java   |  5 +-
 .../hudi/utilities/streamer/HoodieStreamer.java    | 41 ++++++-------
 .../apache/hudi/utilities/streamer/StreamSync.java |  8 +--
 ...TestHoodieDeltaStreamerSchemaEvolutionBase.java |  6 +-
 .../utilities/sources/TestJsonKafkaSource.java     |  2 +-
 .../utilities/streamer/TestErrorTableUtils.java    | 70 ++++++++++++++++++++++
 9 files changed, 111 insertions(+), 42 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index c8a1b47b9fb..24fa9b97051 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -23,7 +23,6 @@ import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.streamer.DefaultStreamContext;
 import org.apache.hudi.utilities.streamer.HoodieStreamer;
@@ -53,6 +52,6 @@ public class DeltaSync extends StreamSync {
                    TypedProperties props, HoodieSparkEngineContext 
hoodieSparkContext, FileSystem fs, Configuration conf,
                    Function<SparkRDDWriteClient, Boolean> 
onInitializingHoodieWriteClient) throws IOException {
     super(cfg, sparkSession, props, hoodieSparkContext,
-        new HoodieHadoopStorage(fs), conf, onInitializingHoodieWriteClient, 
new DefaultStreamContext(schemaProvider, Option.empty()));
+        fs, conf, onInitializingHoodieWriteClient, new 
DefaultStreamContext(schemaProvider, Option.empty()));
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 6c5cca9888e..8d941886a08 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -21,7 +21,6 @@ package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 import org.apache.hudi.utilities.streamer.HoodieStreamer;
 
 import org.apache.hadoop.conf.Configuration;
@@ -51,7 +50,7 @@ public class HoodieDeltaStreamer extends HoodieStreamer {
                              JavaSparkContext jssc,
                              FileSystem fs,
                              Configuration conf) throws IOException {
-    super(cfg, jssc, new HoodieHadoopStorage(fs), conf);
+    super(cfg, jssc, fs, conf);
   }
 
   public HoodieDeltaStreamer(Config cfg,
@@ -59,7 +58,7 @@ public class HoodieDeltaStreamer extends HoodieStreamer {
                              FileSystem fs,
                              Configuration conf,
                              Option<TypedProperties> propsOverride) throws 
IOException {
-    super(cfg, jssc, new HoodieHadoopStorage(fs), conf, propsOverride);
+    super(cfg, jssc, fs, conf, propsOverride);
   }
 
   @Deprecated
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java
index b9d18dbd916..5c25d68d2c4 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java
@@ -19,13 +19,16 @@
 
 package org.apache.hudi.utilities.streamer;
 
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.VisibleForTesting;
-import org.apache.hudi.storage.HoodieStorage;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.SparkSession;
 
@@ -40,6 +43,7 @@ import java.io.Serializable;
  *
  * The writer can use the configs defined in HoodieErrorTableConfig to manage 
the error table.
  */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
 public abstract class BaseErrorTableWriter<T extends ErrorEvent> implements 
Serializable {
 
   // The column name passed to Spark for option `columnNameOfCorruptRecord`. 
The record
@@ -47,8 +51,7 @@ public abstract class BaseErrorTableWriter<T extends 
ErrorEvent> implements Seri
   public static String ERROR_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record";
 
   public BaseErrorTableWriter(HoodieStreamer.Config cfg, SparkSession 
sparkSession,
-                              TypedProperties props,
-                              HoodieSparkEngineContext hoodieSparkContext, 
HoodieStorage storage) {
+                              TypedProperties props, HoodieSparkEngineContext 
hoodieSparkContext, FileSystem fileSystem) {
   }
 
   /**
@@ -57,18 +60,20 @@ public abstract class BaseErrorTableWriter<T extends 
ErrorEvent> implements Seri
    *
    * @param errorEvent Input error event RDD
    */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public abstract void addErrorEvents(JavaRDD<T> errorEvent);
 
   /**
    * Fetches the error events RDD processed by the writer so far. This is a 
test API.
    */
   @VisibleForTesting
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public abstract Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String 
baseTableInstantTime, Option<String> commitedInstantTime);
 
   /**
    * This API is called to commit the error events (failed Hoodie Records) 
processed by the writer so far.
    * These records are committed to a error table.
    */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public abstract boolean upsertAndCommit(String baseTableInstantTime, 
Option<String> commitedInstantTime);
-
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java
index fce14d18807..9d9de91fbba 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieErrorTableConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieValidationException;
-import org.apache.hudi.storage.HoodieStorage;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.spark.sql.Dataset;
@@ -48,7 +47,7 @@ public final class ErrorTableUtils {
                                                                  SparkSession 
sparkSession,
                                                                  
TypedProperties props,
                                                                  
HoodieSparkEngineContext hoodieSparkContext,
-                                                                 HoodieStorage 
storage) {
+                                                                 FileSystem 
fileSystem) {
     String errorTableWriterClass = 
props.getString(ERROR_TABLE_WRITE_CLASS.key());
     
ValidationUtils.checkState(!StringUtils.isNullOrEmpty(errorTableWriterClass),
         "Missing error table config " + ERROR_TABLE_WRITE_CLASS);
@@ -65,7 +64,7 @@ public final class ErrorTableUtils {
     try {
       return Option.of((BaseErrorTableWriter) 
ReflectionUtils.getClass(errorTableWriterClass)
           .getConstructor(argClassArr)
-          .newInstance(cfg, sparkSession, props, hoodieSparkContext, storage));
+          .newInstance(cfg, sparkSession, props, hoodieSparkContext, 
fileSystem));
     } catch (NoSuchMethodException | InvocationTargetException | 
InstantiationException
              | IllegalAccessException e) {
       throw new HoodieException(errMsg, e);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 5af958d108b..4ea84ff7a5e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -58,8 +58,8 @@ import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hive.HiveSyncTool;
 import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 import org.apache.hudi.utilities.HiveIncrementalPuller;
 import org.apache.hudi.utilities.IdentitySplitter;
 import org.apache.hudi.utilities.UtilHelpers;
@@ -132,26 +132,24 @@ public class HoodieStreamer implements Serializable {
   public static final String STREAMSYNC_POOL_NAME = "hoodiedeltasync";
 
   public HoodieStreamer(Config cfg, JavaSparkContext jssc) throws IOException {
-    this(cfg, jssc,
-        HoodieStorageUtils.getStorage(cfg.targetBasePath, 
HadoopFSUtils.getStorageConf(jssc.hadoopConfiguration())),
+    this(cfg, jssc, HadoopFSUtils.getFs(cfg.targetBasePath, 
jssc.hadoopConfiguration()),
         jssc.hadoopConfiguration(), Option.empty());
   }
 
   public HoodieStreamer(Config cfg, JavaSparkContext jssc, 
Option<TypedProperties> props) throws IOException {
-    this(cfg, jssc,
-        HoodieStorageUtils.getStorage(cfg.targetBasePath, 
HadoopFSUtils.getStorageConf(jssc.hadoopConfiguration())),
+    this(cfg, jssc, HadoopFSUtils.getFs(cfg.targetBasePath, 
jssc.hadoopConfiguration()),
         jssc.hadoopConfiguration(), props);
   }
 
-  public HoodieStreamer(Config cfg, JavaSparkContext jssc, HoodieStorage 
storage, Configuration conf) throws IOException {
-    this(cfg, jssc, storage, conf, Option.empty());
+  public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, 
Configuration conf) throws IOException {
+    this(cfg, jssc, fs, conf, Option.empty());
   }
 
-  public HoodieStreamer(Config cfg, JavaSparkContext jssc, HoodieStorage 
storage, Configuration conf, Option<TypedProperties> propsOverride) throws 
IOException {
-    this(cfg, jssc, storage, conf, propsOverride, Option.empty());
+  public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, 
Configuration conf, Option<TypedProperties> propsOverride) throws IOException {
+    this(cfg, jssc, fs, conf, propsOverride, Option.empty());
   }
 
-  public HoodieStreamer(Config cfg, JavaSparkContext jssc, HoodieStorage 
storage, Configuration conf,
+  public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, 
Configuration conf,
                         Option<TypedProperties> propsOverride, 
Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException {
     this.properties = combineProperties(cfg, propsOverride, 
jssc.hadoopConfiguration());
     if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) {
@@ -163,11 +161,10 @@ public class HoodieStreamer implements Serializable {
 
     this.cfg = cfg;
     this.bootstrapExecutor = Option.ofNullable(
-        cfg.runBootstrap ? new BootstrapExecutor(
-            cfg, jssc, (FileSystem) storage.getFileSystem(), conf, 
this.properties) : null);
+        cfg.runBootstrap ? new BootstrapExecutor(cfg, jssc, fs, conf, 
this.properties) : null);
     HoodieSparkEngineContext sparkEngineContext = new 
HoodieSparkEngineContext(jssc);
     this.ingestionService = Option.ofNullable(
-        cfg.runBootstrap ? null : new StreamSyncService(cfg, 
sparkEngineContext, storage, conf, Option.ofNullable(this.properties), 
sourceProfileSupplier));
+        cfg.runBootstrap ? null : new StreamSyncService(cfg, 
sparkEngineContext, fs, conf, Option.ofNullable(this.properties), 
sourceProfileSupplier));
   }
 
   private static TypedProperties combineProperties(Config cfg, 
Option<TypedProperties> propsOverride, Configuration hadoopConf) {
@@ -672,14 +669,14 @@ public class HoodieStreamer implements Serializable {
     private final Option<ConfigurationHotUpdateStrategy> 
configurationHotUpdateStrategyOpt;
 
     public StreamSyncService(Config cfg, HoodieSparkEngineContext 
hoodieSparkContext,
-                             HoodieStorage storage, Configuration conf,
+                             FileSystem fs, Configuration conf,
                              Option<TypedProperties> properties, 
Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException {
       super(HoodieIngestionConfig.newBuilder()
           .isContinuous(cfg.continuousMode)
           .withMinSyncInternalSeconds(cfg.minSyncIntervalSeconds).build());
       this.cfg = cfg;
       this.hoodieSparkContext = hoodieSparkContext;
-      this.storage = storage;
+      this.storage = new HoodieHadoopStorage(fs);
       this.hiveConf = conf;
       this.sparkSession = 
SparkSession.builder().config(hoodieSparkContext.getConf()).getOrCreate();
       this.asyncCompactService = Option.empty();
@@ -732,20 +729,19 @@ public class HoodieStreamer implements Serializable {
           props, hoodieSparkContext.jsc(), cfg.transformerClassNames);
 
       streamSync = new StreamSync(cfg, sparkSession, props, hoodieSparkContext,
-          this.storage, conf, this::onInitializingWriteClient, new 
DefaultStreamContext(schemaProvider, sourceProfileSupplier));
-
+          fs, conf, this::onInitializingWriteClient, new 
DefaultStreamContext(schemaProvider, sourceProfileSupplier));
     }
 
     public StreamSyncService(HoodieStreamer.Config cfg,
-                             HoodieSparkEngineContext hoodieSparkContext, 
HoodieStorage storage,
+                             HoodieSparkEngineContext hoodieSparkContext, 
FileSystem fs,
                              Configuration conf)
         throws IOException {
-      this(cfg, hoodieSparkContext, storage, conf, Option.empty(), 
Option.empty());
+      this(cfg, hoodieSparkContext, fs, conf, Option.empty(), Option.empty());
     }
 
-    public StreamSyncService(HoodieStreamer.Config cfg, 
HoodieSparkEngineContext hoodieSparkContext, HoodieStorage storage, 
Configuration conf, Option<TypedProperties> properties)
+    public StreamSyncService(HoodieStreamer.Config cfg, 
HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf, 
Option<TypedProperties> properties)
             throws IOException {
-      this(cfg, hoodieSparkContext, storage, conf, properties, Option.empty());
+      this(cfg, hoodieSparkContext, fs, conf, properties, Option.empty());
     }
 
     private void initializeTableTypeAndBaseFileFormat() {
@@ -760,7 +756,8 @@ public class HoodieStreamer implements Serializable {
         streamSync.close();
       }
       streamSync = new StreamSync(cfg, sparkSession, props, hoodieSparkContext,
-          storage, hiveConf, this::onInitializingWriteClient, new 
DefaultStreamContext(schemaProvider, Option.empty()));
+          (FileSystem) storage.getFileSystem(), hiveConf, 
this::onInitializingWriteClient,
+          new DefaultStreamContext(schemaProvider, Option.empty()));
     }
 
     @Override
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 20e530c2ee7..d5405645782 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -292,18 +292,18 @@ public class StreamSync implements Serializable, 
Closeable {
                     TypedProperties props, JavaSparkContext jssc, FileSystem 
fs, Configuration conf,
                     Function<SparkRDDWriteClient, Boolean> 
onInitializingHoodieWriteClient) throws IOException {
     this(cfg, sparkSession, props, new HoodieSparkEngineContext(jssc),
-        new HoodieHadoopStorage(fs), conf, onInitializingHoodieWriteClient,
+        fs, conf, onInitializingHoodieWriteClient,
         new DefaultStreamContext(schemaProvider, Option.empty()));
   }
 
   public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession,
                     TypedProperties props, HoodieSparkEngineContext 
hoodieSparkContext,
-                    HoodieStorage storage, Configuration conf,
+                    FileSystem fs, Configuration conf,
                     Function<SparkRDDWriteClient, Boolean> 
onInitializingHoodieWriteClient, StreamContext streamContext) throws 
IOException {
     this.cfg = cfg;
     this.hoodieSparkContext = hoodieSparkContext;
     this.sparkSession = sparkSession;
-    this.storage = storage;
+    this.storage = new HoodieHadoopStorage(fs);
     this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
     this.props = props;
     this.userProvidedSchemaProvider = streamContext.getSchemaProvider();
@@ -319,7 +319,7 @@ public class StreamSync implements Serializable, Closeable {
     this.hoodieMetrics = new HoodieMetrics(hoodieWriteConfig, 
storage.getConf());
     if (props.getBoolean(ERROR_TABLE_ENABLED.key(), 
ERROR_TABLE_ENABLED.defaultValue())) {
       this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(
-          cfg, sparkSession, props, hoodieSparkContext, storage);
+          cfg, sparkSession, props, hoodieSparkContext, fs);
       this.errorWriteFailureStrategy = 
ErrorTableUtils.getErrorWriteFailureStrategy(props);
     }
     refreshTimeline();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
index c6f2afc2ef7..e4670799830 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
@@ -32,7 +32,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieErrorTableConfig;
-import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.AvroKafkaSource;
@@ -42,6 +41,7 @@ import org.apache.hudi.utilities.streamer.HoodieStreamer;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -327,8 +327,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase 
extends HoodieDeltaStrea
     public static Map<String,Option<JavaRDD>> commited = new HashMap<>();
 
     public TestErrorTable(HoodieStreamer.Config cfg, SparkSession 
sparkSession, TypedProperties props, HoodieSparkEngineContext 
hoodieSparkContext,
-                          HoodieStorage storage) {
-      super(cfg, sparkSession, props, hoodieSparkContext, storage);
+                          FileSystem fileSystem) {
+      super(cfg, sparkSession, props, hoodieSparkContext, fileSystem);
     }
 
     @Override
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index 92238721fcd..90d8543a5db 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -296,7 +296,7 @@ public class TestJsonKafkaSource extends 
BaseTestKafkaSource {
 
   private BaseErrorTableWriter getAnonymousErrorTableWriter(TypedProperties 
props) {
     return new BaseErrorTableWriter<ErrorEvent<String>>(new 
HoodieDeltaStreamer.Config(),
-        spark(), props, new HoodieSparkEngineContext(jsc()), hoodieStorage()) {
+        spark(), props, new HoodieSparkEngineContext(jsc()), fs()) {
       List<JavaRDD<HoodieAvroRecord>> errorEvents = new LinkedList();
 
       @Override
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestErrorTableUtils.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestErrorTableUtils.java
new file mode 100644
index 00000000000..1a3af25cd59
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestErrorTableUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+import 
org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamerSchemaEvolutionBase.TestErrorTable;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests {@link ErrorTableUtils}.
+ */
+public class TestErrorTableUtils {
+  @Test
+  public void testGetErrorTableWriter() {
+    SparkSession sparkSession = Mockito.mock(SparkSession.class);
+    HoodieSparkEngineContext sparkContext = 
Mockito.mock(HoodieSparkEngineContext.class);
+    FileSystem fileSystem = Mockito.mock(FileSystem.class);
+
+    TypedProperties props = new TypedProperties();
+    // No error table writer config
+    assertThrows(IllegalArgumentException.class,
+        () -> ErrorTableUtils.getErrorTableWriter(
+            new HoodieStreamer.Config(), sparkSession, props, sparkContext, 
fileSystem));
+
+    // Empty error table writer config
+    props.put("hoodie.errortable.write.class", StringUtils.EMPTY_STRING);
+    assertThrows(IllegalStateException.class,
+        () -> ErrorTableUtils.getErrorTableWriter(
+            new HoodieStreamer.Config(), sparkSession, props, sparkContext, 
fileSystem));
+
+    // Proper error table writer config
+    props.put("hoodie.errortable.write.class", TestErrorTable.class.getName());
+    assertTrue(ErrorTableUtils.getErrorTableWriter(
+        new HoodieStreamer.Config(), sparkSession, props, sparkContext, 
fileSystem).get() instanceof TestErrorTable);
+
+    // Wrong error table writer config
+    props.put("hoodie.errortable.write.class", HoodieConfig.class.getName());
+    assertThrows(HoodieException.class,
+        () -> ErrorTableUtils.getErrorTableWriter(
+            new HoodieStreamer.Config(), sparkSession, props, sparkContext, 
fileSystem));
+  }
+}

Reply via email to