This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 407389ae9a2 [HUDI-6406] Pass in Spark Engine Context Wrapper for
DeltaSync instead of spark engine context (#9008)
407389ae9a2 is described below
commit 407389ae9a255e40c558ba14114bc7c3699f24a8
Author: Rajesh Mahindra <[email protected]>
AuthorDate: Fri Jun 23 14:30:23 2023 -0700
[HUDI-6406] Pass in Spark Engine Context Wrapper for DeltaSync instead of
spark engine context (#9008)
Co-authored-by: rmahindra123 <[email protected]>
---
.../client/common/HoodieFlinkEngineContext.java | 10 +++++
.../client/common/HoodieJavaEngineContext.java | 10 +++++
.../client/common/HoodieSparkEngineContext.java | 35 ++++++++++++++++--
.../apache/hudi/common/engine/EngineProperty.java | 3 +-
.../hudi/common/engine/HoodieEngineContext.java | 4 ++
.../common/engine/HoodieLocalEngineContext.java | 10 +++++
.../deltastreamer/BaseErrorTableWriter.java | 4 +-
.../hudi/utilities/deltastreamer/DeltaSync.java | 43 +++++++++++++---------
.../utilities/deltastreamer/ErrorTableUtils.java | 8 ++--
.../deltastreamer/HoodieDeltaStreamer.java | 42 ++++++++++++---------
.../utilities/sources/TestJsonKafkaSource.java | 5 ++-
11 files changed, 127 insertions(+), 47 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index db5c6ebd296..a62ca42d6b3 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -181,6 +181,16 @@ public class HoodieFlinkEngineContext extends
HoodieEngineContext {
return Collections.emptyList();
}
+ @Override
+ public void cancelJob(String jobId) {
+ // no operation for now
+ }
+
+ @Override
+ public void cancelAllJobs() {
+ // no operation for now
+ }
+
/**
* Override the flink context supplier to return constant write token.
*/
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
index 6ab8e5ab029..5f6751b9961 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
@@ -161,4 +161,14 @@ public class HoodieJavaEngineContext extends
HoodieEngineContext {
public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) {
return Collections.emptyList();
}
+
+ @Override
+ public void cancelJob(String jobId) {
+ // no operation for now
+ }
+
+ @Override
+ public void cancelAllJobs() {
+ // no operation for now
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index c23a333f711..f3b87df040d 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -37,6 +37,9 @@ import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.data.HoodieSparkLongAccumulator;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.sql.SQLContext;
@@ -79,6 +82,10 @@ public class HoodieSparkEngineContext extends
HoodieEngineContext {
return javaSparkContext;
}
+ public JavaSparkContext jsc() {
+ return javaSparkContext;
+ }
+
public SQLContext getSqlContext() {
return sqlContext;
}
@@ -165,9 +172,9 @@ public class HoodieSparkEngineContext extends
HoodieEngineContext {
@Override
public void setProperty(EngineProperty key, String value) {
- if (key == EngineProperty.COMPACTION_POOL_NAME) {
- javaSparkContext.setLocalProperty("spark.scheduler.pool", value);
- } else if (key == EngineProperty.CLUSTERING_POOL_NAME) {
+ if (key.equals(EngineProperty.COMPACTION_POOL_NAME)
+ || key.equals(EngineProperty.CLUSTERING_POOL_NAME)
+ || key.equals(EngineProperty.DELTASYNC_POOL_NAME)) {
javaSparkContext.setLocalProperty("spark.scheduler.pool", value);
} else {
throw new HoodieException("Unknown engine property :" + key);
@@ -211,4 +218,26 @@ public class HoodieSparkEngineContext extends
HoodieEngineContext {
return removed == null ? Collections.emptyList() : removed;
}
}
+
+ @Override
+ public void cancelJob(String groupId) {
+ javaSparkContext.cancelJobGroup(groupId);
+ }
+
+ @Override
+ public void cancelAllJobs() {
+ javaSparkContext.cancelAllJobs();
+ }
+
+ public SparkConf getConf() {
+ return javaSparkContext.getConf();
+ }
+
+ public Configuration hadoopConfiguration() {
+ return javaSparkContext.hadoopConfiguration();
+ }
+
+ public <T> JavaRDD<T> emptyRDD() {
+ return javaSparkContext.emptyRDD();
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineProperty.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineProperty.java
index 36e7594937b..08467a6d193 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineProperty.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineProperty.java
@@ -31,5 +31,6 @@ public enum EngineProperty {
// Amount of total memory available to each engine executor
TOTAL_MEMORY_AVAILABLE,
// Fraction of that memory, that is already in use by the engine
- MEMORY_FRACTION_IN_USE
+ MEMORY_FRACTION_IN_USE,
+ DELTASYNC_POOL_NAME
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index c123c279644..79d62d55770 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -101,4 +101,8 @@ public abstract class HoodieEngineContext {
public abstract List<Integer> getCachedDataIds(HoodieDataCacheKey cacheKey);
public abstract List<Integer> removeCachedDataIds(HoodieDataCacheKey
cacheKey);
+
+ public abstract void cancelJob(String jobId);
+
+ public abstract void cancelAllJobs();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
index 26190b790ca..5239490816d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
@@ -160,4 +160,14 @@ public final class HoodieLocalEngineContext extends
HoodieEngineContext {
public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) {
return Collections.emptyList();
}
+
+ @Override
+ public void cancelJob(String jobId) {
+ // no operation for now
+ }
+
+ @Override
+ public void cancelAllJobs() {
+ // no operation for now
+ }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java
index fea6bdb3cf3..e0bba0600ba 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java
@@ -18,6 +18,7 @@
package org.apache.hudi.utilities.deltastreamer;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.util.Option;
@@ -25,7 +26,6 @@ import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
/**
@@ -44,7 +44,7 @@ public abstract class BaseErrorTableWriter<T extends
ErrorEvent> {
public static String ERROR_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record";
public BaseErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession
sparkSession,
- TypedProperties props, JavaSparkContext
jssc, FileSystem fs) {
+ TypedProperties props, HoodieSparkEngineContext
hoodieSparkContext, FileSystem fs) {
}
/**
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 38582f0f8f4..19ca149154c 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -209,9 +209,9 @@ public class DeltaSync implements Serializable, Closeable {
private transient FileSystem fs;
/**
- * Spark context.
+ * Spark context Wrapper.
*/
- private transient JavaSparkContext jssc;
+ private final transient HoodieSparkEngineContext hoodieSparkContext;
/**
* Spark Session.
@@ -278,12 +278,18 @@ public class DeltaSync implements Serializable, Closeable
{
private final boolean autoGenerateRecordKeys;
+ @Deprecated
public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
SchemaProvider schemaProvider,
TypedProperties props, JavaSparkContext jssc, FileSystem
fs, Configuration conf,
Function<SparkRDDWriteClient, Boolean>
onInitializingHoodieWriteClient) throws IOException {
+ this(cfg, sparkSession, schemaProvider, props, new
HoodieSparkEngineContext(jssc), fs, conf, onInitializingHoodieWriteClient);
+ }
+ public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
SchemaProvider schemaProvider,
+ TypedProperties props, HoodieSparkEngineContext
hoodieSparkContext, FileSystem fs, Configuration conf,
+ Function<SparkRDDWriteClient, Boolean>
onInitializingHoodieWriteClient) throws IOException {
this.cfg = cfg;
- this.jssc = jssc;
+ this.hoodieSparkContext = hoodieSparkContext;
this.sparkSession = sparkSession;
this.fs = fs;
this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
@@ -306,11 +312,11 @@ public class DeltaSync implements Serializable, Closeable
{
}
this.multiwriterIdentifier = StringUtils.isNullOrEmpty(id) ?
Option.empty() : Option.of(id);
if
(props.getBoolean(ERROR_TABLE_ENABLED.key(),ERROR_TABLE_ENABLED.defaultValue()))
{
- this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(cfg,
sparkSession, props, jssc, fs);
+ this.errorTableWriter = ErrorTableUtils.getErrorTableWriter(cfg,
sparkSession, props, hoodieSparkContext, fs);
this.errorWriteFailureStrategy =
ErrorTableUtils.getErrorWriteFailureStrategy(props);
}
this.formatAdapter = new SourceFormatAdapter(
- UtilHelpers.createSource(cfg.sourceClassName, props, jssc,
sparkSession, schemaProvider, metrics),
+ UtilHelpers.createSource(cfg.sourceClassName, props,
hoodieSparkContext.jsc(), sparkSession, schemaProvider, metrics),
this.errorTableWriter, Option.of(props));
this.transformer =
UtilHelpers.createTransformer(Option.ofNullable(cfg.transformerClassNames),
@@ -399,7 +405,7 @@ public class DeltaSync implements Serializable, Closeable {
Boolean.parseBoolean(HIVE_STYLE_PARTITIONING_ENABLE.defaultValue())))
.setUrlEncodePartitioning(props.getBoolean(URL_ENCODE_PARTITIONING.key(),
Boolean.parseBoolean(URL_ENCODE_PARTITIONING.defaultValue())))
- .initTable(new Configuration(jssc.hadoopConfiguration()),
+ .initTable(new Configuration(hoodieSparkContext.hadoopConfiguration()),
cfg.targetBasePath);
}
@@ -539,7 +545,7 @@ public class DeltaSync implements Serializable, Closeable {
formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr,
cfg.sourceLimit);
Option<Dataset<Row>> transformed =
- dataAndCheckpoint.getBatch().map(data ->
transformer.get().apply(jssc, sparkSession, data, props));
+ dataAndCheckpoint.getBatch().map(data ->
transformer.get().apply(hoodieSparkContext.jsc(), sparkSession, data, props));
transformed = formatAdapter.processErrorEvents(transformed,
ErrorEvent.ErrorReason.CUSTOM_TRANSFORMER_FAILURE);
@@ -571,7 +577,7 @@ public class DeltaSync implements Serializable, Closeable {
}
schemaProvider = this.userProvidedSchemaProvider;
} else {
- Option<Schema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jssc, fs, cfg.targetBasePath);
+ Option<Schema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), fs,
cfg.targetBasePath);
// Deduce proper target (writer's) schema for the transformed dataset,
reconciling its
// schema w/ the table's one
Option<Schema> targetSchemaOpt = transformed.map(df -> {
@@ -587,8 +593,8 @@ public class DeltaSync implements Serializable, Closeable {
});
// Override schema provider with the reconciled target schema
schemaProvider = targetSchemaOpt.map(targetSchema ->
- (SchemaProvider) new DelegatingSchemaProvider(props, jssc,
dataAndCheckpoint.getSchemaProvider(),
- new SimpleSchemaProvider(jssc, targetSchema, props)))
+ (SchemaProvider) new DelegatingSchemaProvider(props,
hoodieSparkContext.jsc(), dataAndCheckpoint.getSchemaProvider(),
+ new
SimpleSchemaProvider(hoodieSparkContext.jsc(), targetSchema, props)))
.orElse(dataAndCheckpoint.getSchemaProvider());
// Rewrite transformed records into the expected target schema
avroRDDOptional = transformed.map(t -> getTransformedRDD(t,
reconcileSchema, schemaProvider.getTargetSchema()));
@@ -610,10 +616,10 @@ public class DeltaSync implements Serializable, Closeable
{
return null;
}
- jssc.setJobGroup(this.getClass().getSimpleName(), "Checking if input is
empty");
+ hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking
if input is empty");
if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) {
LOG.info("No new data, perform empty commit.");
- return Pair.of(schemaProvider, Pair.of(checkpointStr, jssc.emptyRDD()));
+ return Pair.of(schemaProvider, Pair.of(checkpointStr,
hoodieSparkContext.emptyRDD()));
}
boolean shouldCombine = cfg.filterDupes ||
cfg.operation.equals(WriteOperationType.UPSERT);
@@ -786,7 +792,7 @@ public class DeltaSync implements Serializable, Closeable {
Option<String> scheduledCompactionInstant = Option.empty();
// filter dupes if needed
if (cfg.filterDupes) {
- records = DataSourceUtils.dropDuplicates(jssc, records,
writeClient.getConfig());
+ records = DataSourceUtils.dropDuplicates(hoodieSparkContext.jsc(),
records, writeClient.getConfig());
}
boolean isEmpty = records.isEmpty();
@@ -953,7 +959,7 @@ public class DeltaSync implements Serializable, Closeable {
LOG.info("When set --enable-hive-sync will use HiveSyncTool for backward
compatibility");
}
if (cfg.enableMetaSync) {
- FileSystem fs = FSUtils.getFs(cfg.targetBasePath,
jssc.hadoopConfiguration());
+ FileSystem fs = FSUtils.getFs(cfg.targetBasePath,
hoodieSparkContext.hadoopConfiguration());
TypedProperties metaProps = new TypedProperties();
metaProps.putAll(props);
@@ -1002,12 +1008,12 @@ public class DeltaSync implements Serializable,
Closeable {
registerAvroSchemas(sourceSchema, targetSchema);
final HoodieWriteConfig initialWriteConfig =
getHoodieClientConfig(targetSchema);
final HoodieWriteConfig writeConfig = SparkSampleWritesUtils
- .getWriteConfigWithRecordSizeEstimate(jssc, records,
initialWriteConfig)
+ .getWriteConfigWithRecordSizeEstimate(hoodieSparkContext.jsc(),
records, initialWriteConfig)
.orElse(initialWriteConfig);
if (writeConfig.isEmbeddedTimelineServerEnabled()) {
if (!embeddedTimelineService.isPresent()) {
- embeddedTimelineService =
EmbeddedTimelineServerHelper.createEmbeddedTimelineService(new
HoodieSparkEngineContext(jssc), writeConfig);
+ embeddedTimelineService =
EmbeddedTimelineServerHelper.createEmbeddedTimelineService(hoodieSparkContext,
writeConfig);
} else {
EmbeddedTimelineServerHelper.updateWriteConfigWithTimelineServer(embeddedTimelineService.get(),
writeConfig);
}
@@ -1017,7 +1023,7 @@ public class DeltaSync implements Serializable, Closeable
{
// Close Write client.
writeClient.close();
}
- writeClient = new SparkRDDWriteClient<>(new
HoodieSparkEngineContext(jssc), writeConfig, embeddedTimelineService);
+ writeClient = new SparkRDDWriteClient<>(hoodieSparkContext, writeConfig,
embeddedTimelineService);
onInitializingHoodieWriteClient.apply(writeClient);
}
@@ -1155,7 +1161,8 @@ public class DeltaSync implements Serializable, Closeable
{
if (LOG.isDebugEnabled()) {
LOG.debug("Registering Schema: " + schemas);
}
-
jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
+ // Use the underlying spark context in case the java context is changed
during runtime
+
hoodieSparkContext.getJavaSparkContext().sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
index 76e7b030b6f..95400021ce7 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.deltastreamer;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
@@ -29,7 +30,6 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.exception.HoodieValidationException;
-import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -45,19 +45,19 @@ import static
org.apache.hudi.utilities.deltastreamer.BaseErrorTableWriter.ERROR
public final class ErrorTableUtils {
public static Option<BaseErrorTableWriter>
getErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
-
TypedProperties props, JavaSparkContext jssc, FileSystem fs) {
+
TypedProperties props, HoodieSparkEngineContext hoodieSparkContext, FileSystem
fs) {
String errorTableWriterClass =
props.getString(ERROR_TABLE_WRITE_CLASS.key());
ValidationUtils.checkState(!StringUtils.isNullOrEmpty(errorTableWriterClass),
"Missing error table config " + ERROR_TABLE_WRITE_CLASS);
Class<?>[] argClassArr = new Class[]{HoodieDeltaStreamer.Config.class,
- SparkSession.class, TypedProperties.class, JavaSparkContext.class,
FileSystem.class};
+ SparkSession.class, TypedProperties.class,
HoodieSparkEngineContext.class, FileSystem.class};
String errMsg = "Unable to instantiate ErrorTableWriter with arguments
type " + Arrays.toString(argClassArr);
ValidationUtils.checkArgument(ReflectionUtils.hasConstructor(BaseErrorTableWriter.class.getName(),
argClassArr, false), errMsg);
try {
return Option.of((BaseErrorTableWriter)
ReflectionUtils.getClass(errorTableWriterClass).getConstructor(argClassArr)
- .newInstance(cfg, sparkSession, props, jssc, fs));
+ .newInstance(cfg, sparkSession, props, hoodieSparkContext, fs));
} catch (NoSuchMethodException | InvocationTargetException |
InstantiationException | IllegalAccessException e) {
throw new HoodieException(errMsg, e);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 0ca939f08dd..f0631e3ef76 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -32,6 +32,7 @@ import
org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -162,8 +163,10 @@ public class HoodieDeltaStreamer implements Serializable {
this.cfg = cfg;
this.bootstrapExecutor = Option.ofNullable(
cfg.runBootstrap ? new BootstrapExecutor(cfg, jssc, fs, conf,
this.properties) : null);
+
+ HoodieSparkEngineContext sparkEngineContext = new
HoodieSparkEngineContext(jssc);
this.ingestionService = Option.ofNullable(
- cfg.runBootstrap ? null : new DeltaSyncService(cfg, jssc, fs, conf,
Option.ofNullable(this.properties)));
+ cfg.runBootstrap ? null : new DeltaSyncService(cfg,
sparkEngineContext, fs, conf, Option.ofNullable(this.properties)));
}
private static TypedProperties combineProperties(Config cfg,
Option<TypedProperties> propsOverride, Configuration hadoopConf) {
@@ -616,9 +619,9 @@ public class HoodieDeltaStreamer implements Serializable {
private transient SparkSession sparkSession;
/**
- * Spark context.
+ * Spark context Wrapper.
*/
- private transient JavaSparkContext jssc;
+ private final transient HoodieSparkEngineContext hoodieSparkContext;
private transient FileSystem fs;
@@ -653,16 +656,16 @@ public class HoodieDeltaStreamer implements Serializable {
private final Option<ConfigurationHotUpdateStrategy>
configurationHotUpdateStrategyOpt;
- public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs,
Configuration conf,
+ public DeltaSyncService(Config cfg, HoodieSparkEngineContext
hoodieSparkContext, FileSystem fs, Configuration conf,
Option<TypedProperties> properties) throws
IOException {
super(HoodieIngestionConfig.newBuilder()
.isContinuous(cfg.continuousMode)
.withMinSyncInternalSeconds(cfg.minSyncIntervalSeconds).build());
this.cfg = cfg;
- this.jssc = jssc;
+ this.hoodieSparkContext = hoodieSparkContext;
this.fs = fs;
this.hiveConf = conf;
- this.sparkSession =
SparkSession.builder().config(jssc.getConf()).getOrCreate();
+ this.sparkSession =
SparkSession.builder().config(hoodieSparkContext.getConf()).getOrCreate();
this.asyncCompactService = Option.empty();
this.asyncClusteringService = Option.empty();
this.postWriteTerminationStrategy =
StringUtils.isNullOrEmpty(cfg.postWriteTerminationStrategyClass) ?
Option.empty() :
@@ -706,15 +709,15 @@ public class HoodieDeltaStreamer implements Serializable {
LOG.info(toSortedTruncatedString(props));
this.schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
- UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props,
jssc), props, jssc, cfg.transformerClassNames);
+ UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props,
hoodieSparkContext.jsc()),
+ props, hoodieSparkContext.jsc(), cfg.transformerClassNames);
- deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props,
jssc, fs, conf,
- this::onInitializingWriteClient);
+ deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props,
hoodieSparkContext, fs, conf, this::onInitializingWriteClient);
}
- public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext
jssc, FileSystem fs, Configuration conf)
+ public DeltaSyncService(HoodieDeltaStreamer.Config cfg,
HoodieSparkEngineContext hoodieSparkContext, FileSystem fs, Configuration conf)
throws IOException {
- this(cfg, jssc, fs, conf, Option.empty());
+ this(cfg, hoodieSparkContext, fs, conf, Option.empty());
}
private void initializeTableTypeAndBaseFileFormat() {
@@ -728,7 +731,7 @@ public class HoodieDeltaStreamer implements Serializable {
if (deltaSync != null) {
deltaSync.close();
}
- deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props,
jssc, fs, hiveConf, this::onInitializingWriteClient);
+ deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props,
hoodieSparkContext, fs, hiveConf, this::onInitializingWriteClient);
}
@Override
@@ -739,7 +742,7 @@ public class HoodieDeltaStreamer implements Serializable {
if (cfg.isAsyncCompactionEnabled()) {
// set Scheduler Pool.
LOG.info("Setting Spark Pool name for delta-sync to " +
DELTASYNC_POOL_NAME);
- jssc.setLocalProperty("spark.scheduler.pool", DELTASYNC_POOL_NAME);
+ hoodieSparkContext.setProperty(EngineProperty.DELTASYNC_POOL_NAME,
DELTASYNC_POOL_NAME);
}
HoodieClusteringConfig clusteringConfig =
HoodieClusteringConfig.from(props);
@@ -867,10 +870,10 @@ public class HoodieDeltaStreamer implements Serializable {
// Update the write client used by Async Compactor.
asyncCompactService.get().updateWriteClient(writeClient);
} else {
- asyncCompactService = Option.ofNullable(new
SparkAsyncCompactService(new HoodieSparkEngineContext(jssc), writeClient));
+ asyncCompactService = Option.ofNullable(new
SparkAsyncCompactService(hoodieSparkContext, writeClient));
// Enqueue existing pending compactions first
HoodieTableMetaClient meta =
- HoodieTableMetaClient.builder().setConf(new
Configuration(jssc.hadoopConfiguration())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build();
+ HoodieTableMetaClient.builder().setConf(new
Configuration(hoodieSparkContext.hadoopConfiguration())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build();
List<HoodieInstant> pending =
CompactionUtils.getPendingCompactionInstantTimes(meta);
pending.forEach(hoodieInstant ->
asyncCompactService.get().enqueuePendingAsyncServiceInstant(hoodieInstant));
asyncCompactService.get().start(error -> true);
@@ -889,9 +892,9 @@ public class HoodieDeltaStreamer implements Serializable {
if (asyncClusteringService.isPresent()) {
asyncClusteringService.get().updateWriteClient(writeClient);
} else {
- asyncClusteringService = Option.ofNullable(new
SparkAsyncClusteringService(new HoodieSparkEngineContext(jssc), writeClient));
+ asyncClusteringService = Option.ofNullable(new
SparkAsyncClusteringService(hoodieSparkContext, writeClient));
HoodieTableMetaClient meta = HoodieTableMetaClient.builder()
- .setConf(new Configuration(jssc.hadoopConfiguration()))
+ .setConf(new
Configuration(hoodieSparkContext.hadoopConfiguration()))
.setBasePath(cfg.targetBasePath)
.setLoadActiveTimelineOnLoad(true).build();
List<HoodieInstant> pending =
ClusteringUtils.getPendingClusteringInstantTimes(meta);
@@ -942,6 +945,11 @@ public class HoodieDeltaStreamer implements Serializable {
return props;
}
+ @VisibleForTesting
+ public HoodieSparkEngineContext getHoodieSparkContext() {
+ return hoodieSparkContext;
+ }
+
@VisibleForTesting
public DeltaSync getDeltaSync() {
return deltaSync;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index 072e20c4c47..ad71eb51526 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -18,6 +18,7 @@
package org.apache.hudi.utilities.sources;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
@@ -286,7 +287,7 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
private BaseErrorTableWriter getAnonymousErrorTableWriter(TypedProperties
props) {
return new BaseErrorTableWriter<ErrorEvent<String>>(new
HoodieDeltaStreamer.Config(),
- spark(), props, jsc(), fs()) {
+ spark(), props, new HoodieSparkEngineContext(jsc()), fs()) {
List<JavaRDD<HoodieAvroRecord>> errorEvents = new LinkedList();
@Override
@@ -305,7 +306,7 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
}
};
}
-
+
@Test
public void testAppendKafkaOffset() {
final String topic = TEST_TOPIC_PREFIX + "testKafkaOffsetAppend";