This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 05adfa2930 [HUDI-3959] Rename class name for spark rdd reader (#5409)
05adfa2930 is described below
commit 05adfa2930166e8c3ac0ee905ee5cc4bb0530cce
Author: simonsssu <[email protected]>
AuthorDate: Sun Sep 18 06:16:52 2022 +0800
[HUDI-3959] Rename class name for spark rdd reader (#5409)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../cli/functional/CLIFunctionalTestHarness.java | 4 +-
.../org/apache/hudi/client/HoodieReadClient.java | 199 +--------------------
...odieReadClient.java => SparkRDDReadClient.java} | 12 +-
.../apache/hudi/client/TestHoodieReadClient.java | 8 +-
.../hudi/table/TestHoodieMergeOnReadTable.java | 4 +-
.../table/action/compact/CompactionTestBase.java | 4 +-
.../table/action/compact/TestAsyncCompaction.java | 20 +--
.../table/action/compact/TestInlineCompaction.java | 20 +--
.../hudi/testutils/FunctionalTestHarness.java | 4 +-
.../hudi/testutils/HoodieClientTestHarness.java | 8 +-
.../hudi/testutils/HoodieClientTestUtils.java | 4 +-
.../SparkClientFunctionalTestHarness.java | 4 +-
.../quickstart/TestHoodieSparkQuickstart.java | 4 +-
.../main/java/org/apache/hudi/DataSourceUtils.java | 4 +-
.../apache/hudi/utilities/TestHoodieIndexer.java | 4 +-
.../hudi/utilities/TestHoodieRepairTool.java | 4 +-
16 files changed, 62 insertions(+), 245 deletions(-)
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java
index 7a12a6692a..a8f27c3d6b 100644
---
a/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java
@@ -19,7 +19,7 @@
package org.apache.hudi.cli.functional;
-import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
@@ -107,7 +107,7 @@ public class CLIFunctionalTestHarness implements
SparkProvider {
if (!initialized) {
SparkConf sparkConf = conf();
SparkRDDWriteClient.registerClasses(sparkConf);
- HoodieReadClient.addHoodieSupport(sparkConf);
+ SparkRDDReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
jsc = new JavaSparkContext(spark.sparkContext());
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
index 97e54070cf..7277479f64 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
@@ -18,217 +18,34 @@
package org.apache.hudi.client;
-import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.CompactionUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.data.HoodieJavaRDD;
-import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.index.SparkHoodieIndexFactory;
-import org.apache.hudi.table.HoodieSparkTable;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.types.StructType;
-
-import java.io.Serializable;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import scala.Tuple2;
/**
* Provides an RDD based API for accessing/filtering Hoodie tables, based on
keys.
+ *
+ * @deprecated This. Use {@link SparkRDDReadClient instead.}
*/
-public class HoodieReadClient<T extends HoodieRecordPayload<T>> implements
Serializable {
+@Deprecated
+public class HoodieReadClient<T extends HoodieRecordPayload<T>> extends
SparkRDDReadClient<T> {
- private static final long serialVersionUID = 1L;
-
- /**
- * TODO: We need to persist the index type into hoodie.properties and be
able to access the index just with a simple
- * base path pointing to the table. Until, then just always assume a
BloomIndex
- */
- private final transient HoodieIndex<?, ?> index;
- private HoodieTable hoodieTable;
- private transient Option<SQLContext> sqlContextOpt;
- private final transient HoodieSparkEngineContext context;
- private final transient Configuration hadoopConf;
-
- /**
- * @param basePath path to Hoodie table
- */
public HoodieReadClient(HoodieSparkEngineContext context, String basePath) {
- this(context, HoodieWriteConfig.newBuilder().withPath(basePath)
- // by default we use HoodieBloomIndex
-
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build());
+ super(context, basePath);
}
- /**
- * @param context
- * @param basePath
- * @param sqlContext
- */
public HoodieReadClient(HoodieSparkEngineContext context, String basePath,
SQLContext sqlContext) {
- this(context, basePath);
- this.sqlContextOpt = Option.of(sqlContext);
+ super(context, basePath, sqlContext);
}
- /**
- * Initializes the {@link HoodieReadClient} with engine context, base path,
SQL context and index type.
- *
- * @param context Hudi Spark engine context
- * @param basePath Base path of the table
- * @param sqlContext {@link SQLContext} instance
- * @param indexType Hudi index type
- */
public HoodieReadClient(HoodieSparkEngineContext context, String basePath,
SQLContext sqlContext, HoodieIndex.IndexType indexType) {
- this(context, HoodieWriteConfig.newBuilder().withPath(basePath)
-
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()).build());
- this.sqlContextOpt = Option.of(sqlContext);
+ super(context, basePath, sqlContext, indexType);
}
- /**
- * @param clientConfig instance of HoodieWriteConfig
- */
public HoodieReadClient(HoodieSparkEngineContext context, HoodieWriteConfig
clientConfig) {
- this.context = context;
- this.hadoopConf = context.getHadoopConf().get();
- final String basePath = clientConfig.getBasePath();
- // Create a Hoodie table which encapsulated the commits and files visible
- HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
- this.hoodieTable = HoodieSparkTable.create(clientConfig, context,
metaClient);
- this.index = SparkHoodieIndexFactory.createIndex(clientConfig);
- this.sqlContextOpt = Option.empty();
- }
-
- /**
- * Adds support for accessing Hoodie built tables from SparkSQL, as you
normally would.
- *
- * @return SparkConf object to be used to construct the SparkContext by
caller
- */
- public static SparkConf addHoodieSupport(SparkConf conf) {
- conf.set("spark.sql.hive.convertMetastoreParquet", "false");
- return conf;
- }
-
- private void assertSqlContext() {
- if (!sqlContextOpt.isPresent()) {
- throw new IllegalStateException("SQLContext must be set, when performing
dataframe operations");
- }
- }
-
- private Option<String> convertToDataFilePath(Option<Pair<String, String>>
partitionPathFileIDPair) {
- if (partitionPathFileIDPair.isPresent()) {
- HoodieBaseFile dataFile = hoodieTable.getBaseFileOnlyView()
- .getLatestBaseFile(partitionPathFileIDPair.get().getLeft(),
partitionPathFileIDPair.get().getRight()).get();
- return Option.of(dataFile.getPath());
- } else {
- return Option.empty();
- }
- }
-
- /**
- * Given a bunch of hoodie keys, fetches all the individual records out as a
data frame.
- *
- * @return a dataframe
- */
- public Dataset<Row> readROView(JavaRDD<HoodieKey> hoodieKeys, int
parallelism) {
- assertSqlContext();
- JavaPairRDD<HoodieKey, Option<Pair<String, String>>> lookupResultRDD =
checkExists(hoodieKeys);
- JavaPairRDD<HoodieKey, Option<String>> keyToFileRDD =
- lookupResultRDD.mapToPair(r -> new Tuple2<>(r._1,
convertToDataFilePath(r._2)));
- List<String> paths = keyToFileRDD.filter(keyFileTuple ->
keyFileTuple._2().isPresent())
- .map(keyFileTuple -> keyFileTuple._2().get()).collect();
-
- // record locations might be same for multiple keys, so need a unique list
- Set<String> uniquePaths = new HashSet<>(paths);
- Dataset<Row> originalDF = null;
- // read files based on the file extension name
- if (paths.size() == 0 ||
paths.get(0).endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
- originalDF = sqlContextOpt.get().read().parquet(uniquePaths.toArray(new
String[uniquePaths.size()]));
- } else if (paths.get(0).endsWith(HoodieFileFormat.ORC.getFileExtension()))
{
- originalDF = sqlContextOpt.get().read().orc(uniquePaths.toArray(new
String[uniquePaths.size()]));
- }
- StructType schema = originalDF.schema();
- JavaPairRDD<HoodieKey, Row> keyRowRDD = originalDF.javaRDD().mapToPair(row
-> {
- HoodieKey key = new
HoodieKey(row.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD),
- row.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
- return new Tuple2<>(key, row);
- });
-
- // Now, we need to further filter out, for only rows that match the
supplied hoodie keys
- JavaRDD<Row> rowRDD = keyRowRDD.join(keyToFileRDD, parallelism).map(tuple
-> tuple._2()._1());
- return sqlContextOpt.get().createDataFrame(rowRDD, schema);
- }
-
- /**
- * Checks if the given [Keys] exists in the hoodie table and returns [Key,
Option[FullFilePath]] If the optional
- * FullFilePath value is not present, then the key is not found. If the
FullFilePath value is present, it is the path
- * component (without scheme) of the URI underlying file
- */
- public JavaPairRDD<HoodieKey, Option<Pair<String, String>>>
checkExists(JavaRDD<HoodieKey> hoodieKeys) {
- return HoodieJavaRDD.getJavaRDD(
- index.tagLocation(HoodieJavaRDD.of(hoodieKeys.map(k -> new
HoodieAvroRecord<>(k, null))),
- context, hoodieTable))
- .mapToPair(hr -> new Tuple2<>(hr.getKey(), hr.isCurrentLocationKnown()
- ? Option.of(Pair.of(hr.getPartitionPath(),
hr.getCurrentLocation().getFileId()))
- : Option.empty())
- );
- }
-
- /**
- * Filter out HoodieRecords that already exists in the output folder. This
is useful in deduplication.
- *
- * @param hoodieRecords Input RDD of Hoodie records.
- * @return A subset of hoodieRecords RDD, with existing records filtered out.
- */
- public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>>
hoodieRecords) {
- JavaRDD<HoodieRecord<T>> recordsWithLocation = tagLocation(hoodieRecords);
- return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown());
- }
-
- /**
- * Looks up the index and tags each incoming record with a location of a
file that contains the row (if it is actually
- * present). Input RDD should contain no duplicates if needed.
- *
- * @param hoodieRecords Input RDD of Hoodie records
- * @return Tagged RDD of Hoodie records
- */
- public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>>
hoodieRecords) throws HoodieIndexException {
- return HoodieJavaRDD.getJavaRDD(
- index.tagLocation(HoodieJavaRDD.of(hoodieRecords), context,
hoodieTable));
- }
-
- /**
- * Return all pending compactions with instant time for clients to decide
what to compact next.
- *
- * @return
- */
- public List<Pair<String, HoodieCompactionPlan>> getPendingCompactions() {
- HoodieTableMetaClient metaClient =
-
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(hoodieTable.getMetaClient().getBasePath()).setLoadActiveTimelineOnLoad(true).build();
- return CompactionUtils.getAllPendingCompactionPlans(metaClient).stream()
- .map(
- instantWorkloadPair ->
Pair.of(instantWorkloadPair.getKey().getTimestamp(),
instantWorkloadPair.getValue()))
- .collect(Collectors.toList());
+ super(context, clientConfig);
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDReadClient.java
similarity index 94%
copy from
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
copy to
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDReadClient.java
index 97e54070cf..adddabfdc0 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDReadClient.java
@@ -59,7 +59,7 @@ import scala.Tuple2;
/**
* Provides an RDD based API for accessing/filtering Hoodie tables, based on
keys.
*/
-public class HoodieReadClient<T extends HoodieRecordPayload<T>> implements
Serializable {
+public class SparkRDDReadClient<T extends HoodieRecordPayload<T>> implements
Serializable {
private static final long serialVersionUID = 1L;
@@ -76,7 +76,7 @@ public class HoodieReadClient<T extends
HoodieRecordPayload<T>> implements Seria
/**
* @param basePath path to Hoodie table
*/
- public HoodieReadClient(HoodieSparkEngineContext context, String basePath) {
+ public SparkRDDReadClient(HoodieSparkEngineContext context, String basePath)
{
this(context, HoodieWriteConfig.newBuilder().withPath(basePath)
// by default we use HoodieBloomIndex
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build());
@@ -87,7 +87,7 @@ public class HoodieReadClient<T extends
HoodieRecordPayload<T>> implements Seria
* @param basePath
* @param sqlContext
*/
- public HoodieReadClient(HoodieSparkEngineContext context, String basePath,
SQLContext sqlContext) {
+ public SparkRDDReadClient(HoodieSparkEngineContext context, String basePath,
SQLContext sqlContext) {
this(context, basePath);
this.sqlContextOpt = Option.of(sqlContext);
}
@@ -100,16 +100,16 @@ public class HoodieReadClient<T extends
HoodieRecordPayload<T>> implements Seria
* @param sqlContext {@link SQLContext} instance
* @param indexType Hudi index type
*/
- public HoodieReadClient(HoodieSparkEngineContext context, String basePath,
SQLContext sqlContext, HoodieIndex.IndexType indexType) {
+ public SparkRDDReadClient(HoodieSparkEngineContext context, String basePath,
SQLContext sqlContext, HoodieIndex.IndexType indexType) {
this(context, HoodieWriteConfig.newBuilder().withPath(basePath)
-
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()).build());
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()).build());
this.sqlContextOpt = Option.of(sqlContext);
}
/**
* @param clientConfig instance of HoodieWriteConfig
*/
- public HoodieReadClient(HoodieSparkEngineContext context, HoodieWriteConfig
clientConfig) {
+ public SparkRDDReadClient(HoodieSparkEngineContext context,
HoodieWriteConfig clientConfig) {
this.context = context;
this.hadoopConf = context.getHadoopConf().get();
final String basePath = clientConfig.getBasePath();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
index 872a4a4215..5ff92fe197 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
@@ -85,7 +85,7 @@ public class TestHoodieReadClient extends
HoodieClientTestBase {
@Test
public void testReadROViewFailsWithoutSqlContext() {
- HoodieReadClient readClient = new HoodieReadClient(context, getConfig());
+ SparkRDDReadClient readClient = new SparkRDDReadClient(context,
getConfig());
JavaRDD<HoodieKey> recordsRDD = jsc.parallelize(new ArrayList<>(), 1);
assertThrows(IllegalStateException.class, () -> {
readClient.readROView(recordsRDD, 1);
@@ -103,7 +103,7 @@ public class TestHoodieReadClient extends
HoodieClientTestBase {
private void testReadFilterExist(HoodieWriteConfig config,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient,
JavaRDD<HoodieRecord>, String> writeFn) throws Exception {
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) {
- HoodieReadClient readClient = getHoodieReadClient(config.getBasePath());
+ SparkRDDReadClient readClient =
getHoodieReadClient(config.getBasePath());
String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
@@ -119,7 +119,7 @@ public class TestHoodieReadClient extends
HoodieClientTestBase {
// Verify there are no errors
assertNoWriteErrors(statuses);
- HoodieReadClient anotherReadClient =
getHoodieReadClient(config.getBasePath());
+ SparkRDDReadClient anotherReadClient =
getHoodieReadClient(config.getBasePath());
filteredRDD = anotherReadClient.filterExists(recordsRDD);
List<HoodieRecord> result = filteredRDD.collect();
// Check results
@@ -212,7 +212,7 @@ public class TestHoodieReadClient extends
HoodieClientTestBase {
jsc.parallelize(result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
.map(record -> new HoodieAvroRecord(record.getKey(),
null)).collect(Collectors.toList()));
// Should have 100 records in table (check using Index), all in
locations marked at commit
- HoodieReadClient readClient =
getHoodieReadClient(hoodieWriteConfig.getBasePath());
+ SparkRDDReadClient readClient =
getHoodieReadClient(hoodieWriteConfig.getBasePath());
List<HoodieRecord> taggedRecords =
readClient.tagLocation(recordRDD).collect();
checkTaggedRecords(taggedRecords, newCommitTime);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 0b80d20b39..18f764c1fa 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -18,7 +18,7 @@
package org.apache.hudi.table;
-import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.FileSlice;
@@ -233,7 +233,7 @@ public class TestHoodieMergeOnReadTable extends
SparkClientFunctionalTestHarness
List<HoodieRecord> updatedRecords =
dataGen.generateUpdates(newCommitTime, records);
JavaRDD<HoodieRecord> updatedRecordsRDD =
jsc().parallelize(updatedRecords, 1);
- HoodieReadClient readClient = new HoodieReadClient(context(), config);
+ SparkRDDReadClient readClient = new SparkRDDReadClient(context(),
config);
JavaRDD<HoodieRecord> updatedTaggedRecordsRDD =
readClient.tagLocation(updatedRecordsRDD);
writeClient.startCommitWithTime(newCommitTime);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
index c3f4395b5a..a571a6f473 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
@@ -20,7 +20,7 @@ package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
@@ -106,7 +106,7 @@ public class CompactionTestBase extends
HoodieClientTestBase {
});
}
- protected List<HoodieRecord> runNextDeltaCommits(SparkRDDWriteClient client,
final HoodieReadClient readClient, List<String> deltaInstants,
+ protected List<HoodieRecord> runNextDeltaCommits(SparkRDDWriteClient client,
final SparkRDDReadClient readClient, List<String> deltaInstants,
List<HoodieRecord> records,
HoodieWriteConfig cfg, boolean insertFirst, List<String>
expPendingCompactionInstants)
throws Exception {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index 87d8613303..f673872804 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -18,7 +18,7 @@
package org.apache.hudi.table.action.compact;
-import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
@@ -60,7 +60,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
// Rollback inflight compaction
HoodieWriteConfig cfg = getConfig(false);
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
- HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
String secondInstantTime = "004";
String compactionInstantTime = "005";
@@ -120,7 +120,7 @@ public class TestAsyncCompaction extends CompactionTestBase
{
int numRecs = 2000;
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
- HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime,
numRecs);
records = runNextDeltaCommits(client, readClient,
Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());
@@ -162,7 +162,7 @@ public class TestAsyncCompaction extends CompactionTestBase
{
// There is inflight compaction. Subsequent compaction run must work
correctly
HoodieWriteConfig cfg = getConfig(true);
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
- HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
String secondInstantTime = "004";
String compactionInstantTime = "005";
@@ -195,7 +195,7 @@ public class TestAsyncCompaction extends CompactionTestBase
{
// Case: Failure case. Latest pending compaction instant time must be
earlier than this instant time
HoodieWriteConfig cfg = getConfig(false);
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
- HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
String secondInstantTime = "004";
@@ -226,7 +226,7 @@ public class TestAsyncCompaction extends CompactionTestBase
{
HoodieWriteConfig cfg = getConfig(false);
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
- HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
String secondInstantTime = "004";
@@ -258,7 +258,7 @@ public class TestAsyncCompaction extends CompactionTestBase
{
HoodieWriteConfig cfg = getConfig(false);
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
- HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
final String firstInstantTime = "001";
final String secondInstantTime = "004";
@@ -293,7 +293,7 @@ public class TestAsyncCompaction extends CompactionTestBase
{
// No Delta Commits after compaction request
HoodieWriteConfig cfg = getConfig(true);
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
- HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
String secondInstantTime = "004";
String compactionInstantTime = "005";
@@ -314,7 +314,7 @@ public class TestAsyncCompaction extends CompactionTestBase
{
// Case: Two delta commits before and after compaction schedule
HoodieWriteConfig cfg = getConfig(true);
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
- HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
String secondInstantTime = "004";
String compactionInstantTime = "005";
@@ -342,7 +342,7 @@ public class TestAsyncCompaction extends CompactionTestBase
{
// Schedule a compaction. Replace those file groups and ensure compaction
completes successfully.
HoodieWriteConfig cfg = getConfig(true);
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
- HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
String secondInstantTime = "004";
String compactionInstantTime = "005";
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
index 24d387ec3f..32d2dcda95 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
@@ -18,7 +18,7 @@
package org.apache.hudi.table.action.compact;
-import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieRecord;
@@ -73,7 +73,7 @@ public class TestInlineCompaction extends CompactionTestBase {
HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 60,
CompactionTriggerStrategy.NUM_COMMITS);
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
List<HoodieRecord> records =
dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 100);
- HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
List<String> instants = IntStream.range(0, 2).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
@@ -91,7 +91,7 @@ public class TestInlineCompaction extends CompactionTestBase {
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
List<HoodieRecord> records = dataGen.generateInserts(instants.get(0),
100);
- HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
// third commit, that will trigger compaction
@@ -117,7 +117,7 @@ public class TestInlineCompaction extends
CompactionTestBase {
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
List<HoodieRecord> records = dataGen.generateInserts(instants.get(0),
100);
- HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
// step 1: create and complete 4 delta commit, then create 1 compaction
request after this
runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
@@ -175,7 +175,7 @@ public class TestInlineCompaction extends
CompactionTestBase {
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
String instantTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records = dataGen.generateInserts(instantTime, 10);
- HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
runNextDeltaCommits(writeClient, readClient, Arrays.asList(instantTime),
records, cfg, true, new ArrayList<>());
// after 10s, that will trigger compaction
@@ -196,7 +196,7 @@ public class TestInlineCompaction extends
CompactionTestBase {
HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 60,
CompactionTriggerStrategy.NUM_OR_TIME);
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
List<HoodieRecord> records =
dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10);
- HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
List<String> instants = IntStream.range(0, 2).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
// Then: trigger the compaction because reach 3 commits.
@@ -222,7 +222,7 @@ public class TestInlineCompaction extends
CompactionTestBase {
HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 20,
CompactionTriggerStrategy.NUM_AND_TIME);
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
List<HoodieRecord> records =
dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10);
- HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
List<String> instants = IntStream.range(0, 3).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
@@ -251,7 +251,7 @@ public class TestInlineCompaction extends
CompactionTestBase {
String instantTime2;
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
List<HoodieRecord> records = dataGen.generateInserts(instants.get(0),
100);
- HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
// Schedule compaction instant2, make it in-flight (simulates inline
compaction failing)
instantTime2 = HoodieActiveTimeline.createNewInstantTime();
@@ -286,7 +286,7 @@ public class TestInlineCompaction extends
CompactionTestBase {
List<String> instants = IntStream.range(0, 2).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
List<HoodieRecord> records = dataGen.generateInserts(instants.get(0),
100);
- HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
// Schedule compaction instantTime, make it in-flight (simulates inline
compaction failing)
instantTime = HoodieActiveTimeline.createNewInstantTime(10000);
@@ -325,7 +325,7 @@ public class TestInlineCompaction extends
CompactionTestBase {
List<String> instants = IntStream.range(0, 2).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
List<HoodieRecord> records = dataGen.generateInserts(instants.get(0),
10);
- HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ SparkRDDReadClient readClient = getHoodieReadClient(cfg.getBasePath());
runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
// Schedule compaction instantTime, make it in-flight (simulates inline
compaction failing)
instantTime = HoodieActiveTimeline.createNewInstantTime();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
index 11c615a765..9d28577059 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
@@ -19,7 +19,7 @@
package org.apache.hudi.testutils;
-import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -139,7 +139,7 @@ public class FunctionalTestHarness implements
SparkProvider, DFSProvider, Hoodie
if (!initialized) {
SparkConf sparkConf = conf();
SparkRDDWriteClient.registerClasses(sparkConf);
- HoodieReadClient.addHoodieSupport(sparkConf);
+ SparkRDDReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
jsc = new JavaSparkContext(spark.sparkContext());
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index 564870a4ca..e6a4d63e8c 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -28,7 +28,7 @@ import org.apache.hudi.HoodieConversionUtils;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -132,7 +132,7 @@ public abstract class HoodieClientTestHarness extends
HoodieCommonTestHarness im
protected transient ExecutorService executorService;
protected transient HoodieTableMetaClient metaClient;
protected transient SparkRDDWriteClient writeClient;
- protected transient HoodieReadClient readClient;
+ protected transient SparkRDDReadClient readClient;
protected transient HoodieTableFileSystemView tableView;
protected transient TimelineService timelineService;
@@ -481,8 +481,8 @@ public abstract class HoodieClientTestHarness extends
HoodieCommonTestHarness im
}
}
- public HoodieReadClient getHoodieReadClient(String basePath) {
- readClient = new HoodieReadClient(context, basePath,
SQLContext.getOrCreate(jsc.sc()));
+ public SparkRDDReadClient getHoodieReadClient(String basePath) {
+ readClient = new SparkRDDReadClient(context, basePath,
SQLContext.getOrCreate(jsc.sc()));
return readClient;
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index 3387dd24bb..458af3ad9e 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -19,7 +19,7 @@
package org.apache.hudi.testutils;
import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
@@ -100,7 +100,7 @@ public class HoodieClientTestUtils {
sparkConf.set("spark.eventLog.dir", evlogDir);
}
- return HoodieReadClient.addHoodieSupport(sparkConf);
+ return SparkRDDReadClient.addHoodieSupport(sparkConf);
}
private static HashMap<String, String> getLatestFileIDsToFullPath(String
basePath, HoodieTimeline commitTimeline,
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index ba1afbebb2..cb7b2e6b3c 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -185,7 +185,7 @@ public class SparkClientFunctionalTestHarness implements
SparkProvider, HoodieMe
if (!initialized) {
SparkConf sparkConf = conf();
SparkRDDWriteClient.registerClasses(sparkConf);
- HoodieReadClient.addHoodieSupport(sparkConf);
+ SparkRDDReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
jsc = new JavaSparkContext(spark.sparkContext());
diff --git
a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java
b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java
index 32c51788ee..c23db7f8e7 100644
---
a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java
+++
b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java
@@ -18,7 +18,7 @@
package org.apache.hudi.examples.quickstart;
-import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieAvroPayload;
@@ -95,7 +95,7 @@ public class TestHoodieSparkQuickstart implements
SparkProvider {
if (!initialized) {
SparkConf sparkConf = conf();
SparkRDDWriteClient.registerClasses(sparkConf);
- HoodieReadClient.addHoodieSupport(sparkConf);
+ SparkRDDReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
jsc = new JavaSparkContext(spark.sparkContext());
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index 5d3e0bc3eb..ee807f49da 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -21,7 +21,7 @@ package org.apache.hudi;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -246,7 +246,7 @@ public class DataSourceUtils {
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc,
JavaRDD<HoodieRecord> incomingHoodieRecords,
HoodieWriteConfig writeConfig) {
try {
- HoodieReadClient client = new HoodieReadClient<>(new
HoodieSparkEngineContext(jssc), writeConfig);
+ SparkRDDReadClient client = new SparkRDDReadClient<>(new
HoodieSparkEngineContext(jssc), writeConfig);
return client.tagLocation(incomingHoodieRecords)
.filter(r -> !((HoodieRecord<HoodieRecordPayload>)
r).isCurrentLocationKnown());
} catch (TableNotFoundException e) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
index 9c4fc07666..87afd56d83 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
@@ -21,7 +21,7 @@ package org.apache.hudi.utilities;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
-import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -89,7 +89,7 @@ public class TestHoodieIndexer extends
HoodieCommonTestHarness implements SparkP
if (!initialized) {
SparkConf sparkConf = conf();
SparkRDDWriteClient.registerClasses(sparkConf);
- HoodieReadClient.addHoodieSupport(sparkConf);
+ SparkRDDReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
jsc = new JavaSparkContext(spark.sparkContext());
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java
index 8d3917f066..00cf3ae883 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java
@@ -20,7 +20,7 @@
package org.apache.hudi.utilities;
import org.apache.hudi.HoodieTestCommitGenerator;
-import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -94,7 +94,7 @@ public class TestHoodieRepairTool extends
HoodieCommonTestHarness implements Spa
if (!initialized) {
SparkConf sparkConf = conf();
SparkRDDWriteClient.registerClasses(sparkConf);
- HoodieReadClient.addHoodieSupport(sparkConf);
+ SparkRDDReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
jsc = new JavaSparkContext(spark.sparkContext());