This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.15.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit b71da938ac5ce7913965a2e7dfb865cc40e901b5 Author: Y Ethan Guo <[email protected]> AuthorDate: Thu May 23 23:25:27 2024 -0700 [HUDI-7785] Keep public APIs in utilities module the same as before HoodieStorage abstraction (#11280) --- .../hudi/utilities/deltastreamer/DeltaSync.java | 3 +- .../deltastreamer/HoodieDeltaStreamer.java | 5 +- .../utilities/streamer/BaseErrorTableWriter.java | 13 ++-- .../hudi/utilities/streamer/ErrorTableUtils.java | 5 +- .../hudi/utilities/streamer/HoodieStreamer.java | 41 ++++++------- .../apache/hudi/utilities/streamer/StreamSync.java | 8 +-- ...TestHoodieDeltaStreamerSchemaEvolutionBase.java | 6 +- .../utilities/sources/TestJsonKafkaSource.java | 2 +- .../utilities/streamer/TestErrorTableUtils.java | 70 ++++++++++++++++++++++ 9 files changed, 111 insertions(+), 42 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index c8a1b47b9fb..24fa9b97051 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -23,7 +23,6 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; -import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.streamer.DefaultStreamContext; import org.apache.hudi.utilities.streamer.HoodieStreamer; @@ -53,6 +52,6 @@ public class DeltaSync extends StreamSync { TypedProperties props, HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf, Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException { super(cfg, sparkSession, props, hoodieSparkContext, - new HoodieHadoopStorage(fs), conf, onInitializingHoodieWriteClient, new DefaultStreamContext(schemaProvider, Option.empty())); + fs, conf, onInitializingHoodieWriteClient, new DefaultStreamContext(schemaProvider, Option.empty())); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 6c5cca9888e..8d941886a08 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -21,7 +21,6 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; -import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; import org.apache.hudi.utilities.streamer.HoodieStreamer; import org.apache.hadoop.conf.Configuration; @@ -51,7 +50,7 @@ public class HoodieDeltaStreamer extends HoodieStreamer { JavaSparkContext jssc, FileSystem fs, Configuration conf) throws IOException { - super(cfg, jssc, new HoodieHadoopStorage(fs), conf); + super(cfg, jssc, fs, conf); } public HoodieDeltaStreamer(Config cfg, @@ -59,7 +58,7 @@ public class HoodieDeltaStreamer extends HoodieStreamer { FileSystem fs, Configuration conf, Option<TypedProperties> propsOverride) throws IOException { - super(cfg, jssc, new HoodieHadoopStorage(fs), conf, propsOverride); + super(cfg, jssc, fs, conf, propsOverride); } @Deprecated diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java index b9d18dbd916..5c25d68d2c4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BaseErrorTableWriter.java @@ -19,13 +19,16 @@ package org.apache.hudi.utilities.streamer; +import org.apache.hudi.ApiMaturityLevel; +import org.apache.hudi.PublicAPIClass; +import org.apache.hudi.PublicAPIMethod; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.VisibleForTesting; -import org.apache.hudi.storage.HoodieStorage; +import org.apache.hadoop.fs.FileSystem; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.SparkSession; @@ -40,6 +43,7 @@ import java.io.Serializable; * * The writer can use the configs defined in HoodieErrorTableConfig to manage the error table. */ +@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING) public abstract class BaseErrorTableWriter<T extends ErrorEvent> implements Serializable { // The column name passed to Spark for option `columnNameOfCorruptRecord`. The record @@ -47,8 +51,7 @@ public abstract class BaseErrorTableWriter<T extends ErrorEvent> implements Seri public static String ERROR_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record"; public BaseErrorTableWriter(HoodieStreamer.Config cfg, SparkSession sparkSession, - TypedProperties props, - HoodieSparkEngineContext hoodieSparkContext, HoodieStorage storage) { + TypedProperties props, HoodieSparkEngineContext hoodieSparkContext, FileSystem fileSystem) { } /** @@ -57,18 +60,20 @@ public abstract class BaseErrorTableWriter<T extends ErrorEvent> implements Seri * * @param errorEvent Input error event RDD */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) public abstract void addErrorEvents(JavaRDD<T> errorEvent); /** * Fetches the error events RDD processed by the writer so far. This is a test API. */ @VisibleForTesting + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) public abstract Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String baseTableInstantTime, Option<String> commitedInstantTime); /** * This API is called to commit the error events (failed Hoodie Records) processed by the writer so far. * These records are committed to a error table. */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) public abstract boolean upsertAndCommit(String baseTableInstantTime, Option<String> commitedInstantTime); - } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java index fce14d18807..9d9de91fbba 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java @@ -28,7 +28,6 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieErrorTableConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieValidationException; -import org.apache.hudi.storage.HoodieStorage; import org.apache.hadoop.fs.FileSystem; import org.apache.spark.sql.Dataset; @@ -48,7 +47,7 @@ public final class ErrorTableUtils { SparkSession sparkSession, TypedProperties props, HoodieSparkEngineContext hoodieSparkContext, - HoodieStorage storage) { + FileSystem fileSystem) { String errorTableWriterClass = props.getString(ERROR_TABLE_WRITE_CLASS.key()); ValidationUtils.checkState(!StringUtils.isNullOrEmpty(errorTableWriterClass), "Missing error table config " + ERROR_TABLE_WRITE_CLASS); @@ -65,7 +64,7 @@ public final class ErrorTableUtils { try { return Option.of((BaseErrorTableWriter) ReflectionUtils.getClass(errorTableWriterClass) .getConstructor(argClassArr) - .newInstance(cfg, sparkSession, props, hoodieSparkContext, storage)); + .newInstance(cfg, sparkSession, props, hoodieSparkContext, fileSystem)); } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { throw new HoodieException(errMsg, e); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java index 5af958d108b..4ea84ff7a5e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java @@ -58,8 +58,8 @@ import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.storage.HoodieStorage; -import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HoodieHadoopStorage; import org.apache.hudi.utilities.HiveIncrementalPuller; import org.apache.hudi.utilities.IdentitySplitter; import org.apache.hudi.utilities.UtilHelpers; @@ -132,26 +132,24 @@ public class HoodieStreamer implements Serializable { public static final String STREAMSYNC_POOL_NAME = "hoodiedeltasync"; public HoodieStreamer(Config cfg, JavaSparkContext jssc) throws IOException { - this(cfg, jssc, - HoodieStorageUtils.getStorage(cfg.targetBasePath, HadoopFSUtils.getStorageConf(jssc.hadoopConfiguration())), + this(cfg, jssc, HadoopFSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()), jssc.hadoopConfiguration(), Option.empty()); } public HoodieStreamer(Config cfg, JavaSparkContext jssc, Option<TypedProperties> props) throws IOException { - this(cfg, jssc, - HoodieStorageUtils.getStorage(cfg.targetBasePath, HadoopFSUtils.getStorageConf(jssc.hadoopConfiguration())), + this(cfg, jssc, HadoopFSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()), jssc.hadoopConfiguration(), props); } - public HoodieStreamer(Config cfg, JavaSparkContext jssc, HoodieStorage storage, Configuration conf) throws IOException { - this(cfg, jssc, storage, conf, Option.empty()); + public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf) throws IOException { + this(cfg, jssc, fs, conf, Option.empty()); } - public HoodieStreamer(Config cfg, JavaSparkContext jssc, HoodieStorage storage, Configuration conf, Option<TypedProperties> propsOverride) throws IOException { - this(cfg, jssc, storage, conf, propsOverride, Option.empty()); + public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, Option<TypedProperties> propsOverride) throws IOException { + this(cfg, jssc, fs, conf, propsOverride, Option.empty()); } - public HoodieStreamer(Config cfg, JavaSparkContext jssc, HoodieStorage storage, Configuration conf, + public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, Option<TypedProperties> propsOverride, Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException { this.properties = combineProperties(cfg, propsOverride, jssc.hadoopConfiguration()); if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) { @@ -163,11 +161,10 @@ public class HoodieStreamer implements Serializable { this.cfg = cfg; this.bootstrapExecutor = Option.ofNullable( - cfg.runBootstrap ? new BootstrapExecutor( - cfg, jssc, (FileSystem) storage.getFileSystem(), conf, this.properties) : null); + cfg.runBootstrap ? new BootstrapExecutor(cfg, jssc, fs, conf, this.properties) : null); HoodieSparkEngineContext sparkEngineContext = new HoodieSparkEngineContext(jssc); this.ingestionService = Option.ofNullable( - cfg.runBootstrap ? null : new StreamSyncService(cfg, sparkEngineContext, storage, conf, Option.ofNullable(this.properties), sourceProfileSupplier)); + cfg.runBootstrap ? null : new StreamSyncService(cfg, sparkEngineContext, fs, conf, Option.ofNullable(this.properties), sourceProfileSupplier)); } private static TypedProperties combineProperties(Config cfg, Option<TypedProperties> propsOverride, Configuration hadoopConf) { @@ -672,14 +669,14 @@ public class HoodieStreamer implements Serializable { private final Option<ConfigurationHotUpdateStrategy> configurationHotUpdateStrategyOpt; public StreamSyncService(Config cfg, HoodieSparkEngineContext hoodieSparkContext, - HoodieStorage storage, Configuration conf, + FileSystem fs, Configuration conf, Option<TypedProperties> properties, Option<SourceProfileSupplier> sourceProfileSupplier) throws IOException { super(HoodieIngestionConfig.newBuilder() .isContinuous(cfg.continuousMode) .withMinSyncInternalSeconds(cfg.minSyncIntervalSeconds).build()); this.cfg = cfg; this.hoodieSparkContext = hoodieSparkContext; - this.storage = storage; + this.storage = new HoodieHadoopStorage(fs); this.hiveConf = conf; this.sparkSession = SparkSession.builder().config(hoodieSparkContext.getConf()).getOrCreate(); this.asyncCompactService = Option.empty(); @@ -732,20 +729,19 @@ public class HoodieStreamer implements Serializable { props, hoodieSparkContext.jsc(), cfg.transformerClassNames); streamSync = new StreamSync(cfg, sparkSession, props, hoodieSparkContext, - this.storage, conf, this::onInitializingWriteClient, new DefaultStreamContext(schemaProvider, sourceProfileSupplier)); - + fs, conf, this::onInitializingWriteClient, new DefaultStreamContext(schemaProvider, sourceProfileSupplier)); } public StreamSyncService(HoodieStreamer.Config cfg, - HoodieSparkEngineContext hoodieSparkContext, HoodieStorage storage, + HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf) throws IOException { - this(cfg, hoodieSparkContext, storage, conf, Option.empty(), Option.empty()); + this(cfg, hoodieSparkContext, fs, conf, Option.empty(), Option.empty()); } - public StreamSyncService(HoodieStreamer.Config cfg, HoodieSparkEngineContext hoodieSparkContext, HoodieStorage storage, Configuration conf, Option<TypedProperties> properties) + public StreamSyncService(HoodieStreamer.Config cfg, HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf, Option<TypedProperties> properties) throws IOException { - this(cfg, hoodieSparkContext, storage, conf, properties, Option.empty()); + this(cfg, hoodieSparkContext, fs, conf, properties, Option.empty()); } private void initializeTableTypeAndBaseFileFormat() { @@ -760,7 +756,8 @@ public class HoodieStreamer implements Serializable { streamSync.close(); } streamSync = new StreamSync(cfg, sparkSession, props, hoodieSparkContext, - storage, hiveConf, this::onInitializingWriteClient, new DefaultStreamContext(schemaProvider, Option.empty())); + (FileSystem) storage.getFileSystem(), hiveConf, this::onInitializingWriteClient, + new DefaultStreamContext(schemaProvider, Option.empty())); } @Override diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index 20e530c2ee7..d5405645782 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -292,18 +292,18 @@ public class StreamSync implements Serializable, Closeable { TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf, Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException { this(cfg, sparkSession, props, new HoodieSparkEngineContext(jssc), - new HoodieHadoopStorage(fs), conf, onInitializingHoodieWriteClient, + fs, conf, onInitializingHoodieWriteClient, new DefaultStreamContext(schemaProvider, Option.empty())); } public StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession, TypedProperties props, HoodieSparkEngineContext hoodieSparkContext, - HoodieStorage storage, Configuration conf, + FileSystem fs, Configuration conf, Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient, StreamContext streamContext) throws IOException { this.cfg = cfg; this.hoodieSparkContext = hoodieSparkContext; this.sparkSession = sparkSession; - this.storage = storage; + this.storage = new HoodieHadoopStorage(fs); this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient; this.props = props; this.userProvidedSchemaProvider = streamContext.getSchemaProvider(); @@ -319,7 +319,7 @@ public class StreamSync implements Serializable, Closeable { this.hoodieMetrics = new HoodieMetrics(hoodieWriteConfig, storage.getConf()); if (props.getBoolean(ERROR_TABLE_ENABLED.key(), ERROR_TABLE_ENABLED.defaultValue())) { this.errorTableWriter = ErrorTableUtils.getErrorTableWriter( - cfg, sparkSession, props, hoodieSparkContext, storage); + cfg, sparkSession, props, hoodieSparkContext, fs); this.errorWriteFailureStrategy = ErrorTableUtils.getErrorWriteFailureStrategy(props); } refreshTimeline(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java index c6f2afc2ef7..e4670799830 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java @@ -32,7 +32,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieErrorTableConfig; -import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.AvroKafkaSource; @@ -42,6 +41,7 @@ import org.apache.hudi.utilities.streamer.HoodieStreamer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileSystem; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -327,8 +327,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase extends HoodieDeltaStrea public static Map<String,Option<JavaRDD>> commited = new HashMap<>(); public TestErrorTable(HoodieStreamer.Config cfg, SparkSession sparkSession, TypedProperties props, HoodieSparkEngineContext hoodieSparkContext, - HoodieStorage storage) { - super(cfg, sparkSession, props, hoodieSparkContext, storage); + FileSystem fileSystem) { + super(cfg, sparkSession, props, hoodieSparkContext, fileSystem); } @Override diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java index 92238721fcd..90d8543a5db 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java @@ -296,7 +296,7 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource { private BaseErrorTableWriter getAnonymousErrorTableWriter(TypedProperties props) { return new BaseErrorTableWriter<ErrorEvent<String>>(new HoodieDeltaStreamer.Config(), - spark(), props, new HoodieSparkEngineContext(jsc()), hoodieStorage()) { + spark(), props, new HoodieSparkEngineContext(jsc()), fs()) { List<JavaRDD<HoodieAvroRecord>> errorEvents = new LinkedList(); @Override diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestErrorTableUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestErrorTableUtils.java new file mode 100644 index 00000000000..1a3af25cd59 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestErrorTableUtils.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.streamer; + +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamerSchemaEvolutionBase.TestErrorTable; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests {@link ErrorTableUtils}. + */ +public class TestErrorTableUtils { + @Test + public void testGetErrorTableWriter() { + SparkSession sparkSession = Mockito.mock(SparkSession.class); + HoodieSparkEngineContext sparkContext = Mockito.mock(HoodieSparkEngineContext.class); + FileSystem fileSystem = Mockito.mock(FileSystem.class); + + TypedProperties props = new TypedProperties(); + // No error table writer config + assertThrows(IllegalArgumentException.class, + () -> ErrorTableUtils.getErrorTableWriter( + new HoodieStreamer.Config(), sparkSession, props, sparkContext, fileSystem)); + + // Empty error table writer config + props.put("hoodie.errortable.write.class", StringUtils.EMPTY_STRING); + assertThrows(IllegalStateException.class, + () -> ErrorTableUtils.getErrorTableWriter( + new HoodieStreamer.Config(), sparkSession, props, sparkContext, fileSystem)); + + // Proper error table writer config + props.put("hoodie.errortable.write.class", TestErrorTable.class.getName()); + assertTrue(ErrorTableUtils.getErrorTableWriter( + new HoodieStreamer.Config(), sparkSession, props, sparkContext, fileSystem).get() instanceof TestErrorTable); + + // Wrong error table writer config + props.put("hoodie.errortable.write.class", HoodieConfig.class.getName()); + assertThrows(HoodieException.class, + () -> ErrorTableUtils.getErrorTableWriter( + new HoodieStreamer.Config(), sparkSession, props, sparkContext, fileSystem)); + } +}
