This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f93e64f [HUDI-681]Remove embeddedTimelineService from
HoodieReadClient (#1388)
f93e64f is described below
commit f93e64fee413ed1b774156e688794ee7937cc01a
Author: hongdd <[email protected]>
AuthorDate: Mon Mar 9 18:31:04 2020 +0800
[HUDI-681]Remove embeddedTimelineService from HoodieReadClient (#1388)
* [HUDI-681]Remove embeddedTimelineService from HoodieReadClient
---
.../org/apache/hudi/client/HoodieReadClient.java | 21 ++-------------------
.../main/java/org/apache/hudi/DataSourceUtils.java | 10 ++++------
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 5 +----
.../hudi/utilities/deltastreamer/DeltaSync.java | 2 +-
4 files changed, 8 insertions(+), 30 deletions(-)
diff --git
a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
index 33d661b..d1e92b5 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
@@ -19,7 +19,6 @@
package org.apache.hudi.client;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -72,18 +71,10 @@ public class HoodieReadClient<T extends
HoodieRecordPayload> implements Serializ
/**
* @param basePath path to Hoodie table
*/
- public HoodieReadClient(JavaSparkContext jsc, String basePath,
Option<EmbeddedTimelineService> timelineService) {
+ public HoodieReadClient(JavaSparkContext jsc, String basePath) {
this(jsc, HoodieWriteConfig.newBuilder().withPath(basePath)
// by default we use HoodieBloomIndex
-
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(),
- timelineService);
- }
-
- /**
- * @param basePath path to Hoodie table
- */
- public HoodieReadClient(JavaSparkContext jsc, String basePath) {
- this(jsc, basePath, Option.empty());
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build());
}
/**
@@ -100,14 +91,6 @@ public class HoodieReadClient<T extends
HoodieRecordPayload> implements Serializ
* @param clientConfig instance of HoodieWriteConfig
*/
public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig
clientConfig) {
- this(jsc, clientConfig, Option.empty());
- }
-
- /**
- * @param clientConfig instance of HoodieWriteConfig
- */
- public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig,
- Option<EmbeddedTimelineService> timelineService) {
this.jsc = jsc;
final String basePath = clientConfig.getBasePath();
// Create a Hoodie table which encapsulated the commits and files visible
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index 6a4ad03..99a795d 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -23,11 +23,9 @@ import org.apache.avro.Schema;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.config.HoodieCompactionConfig;
@@ -222,9 +220,9 @@ public class DataSourceUtils {
@SuppressWarnings("unchecked")
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc,
JavaRDD<HoodieRecord> incomingHoodieRecords,
- HoodieWriteConfig
writeConfig, Option<EmbeddedTimelineService> timelineService) {
+ HoodieWriteConfig
writeConfig) {
try {
- HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig,
timelineService);
+ HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig);
return client.tagLocation(incomingHoodieRecords)
.filter(r -> !((HoodieRecord<HoodieRecordPayload>)
r).isCurrentLocationKnown());
} catch (TableNotFoundException e) {
@@ -236,10 +234,10 @@ public class DataSourceUtils {
@SuppressWarnings("unchecked")
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc,
JavaRDD<HoodieRecord> incomingHoodieRecords,
- Map<String, String>
parameters, Option<EmbeddedTimelineService> timelineService) {
+ Map<String, String>
parameters) {
HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath(parameters.get("path")).withProps(parameters).build();
- return dropDuplicates(jssc, incomingHoodieRecords, writeConfig,
timelineService);
+ return dropDuplicates(jssc, incomingHoodieRecords, writeConfig);
}
public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props,
String basePath) {
diff --git
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 80a01d3..326595f 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -131,10 +131,7 @@ private[hudi] object HoodieSparkSqlWriter {
val hoodieRecords =
if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
- DataSourceUtils.dropDuplicates(
- jsc,
- hoodieAllIncomingRecords,
- mapAsJavaMap(parameters), client.getTimelineServer)
+ DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords,
mapAsJavaMap(parameters))
} else {
hoodieAllIncomingRecords
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 97d3d42..4b69d22 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -357,7 +357,7 @@ public class DeltaSync implements Serializable {
if (cfg.filterDupes) {
// turn upserts to insert
cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT :
cfg.operation;
- records = DataSourceUtils.dropDuplicates(jssc, records,
writeClient.getConfig(), writeClient.getTimelineServer());
+ records = DataSourceUtils.dropDuplicates(jssc, records,
writeClient.getConfig());
}
boolean isEmpty = records.isEmpty();