This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ee5b32f [HUDI-652] Decouple HoodieReadClient and AbstractHoodieClient
to break the inheritance chain (#1372)
ee5b32f is described below
commit ee5b32f5d4aa26e7fc58ccdae46935f063460920
Author: vinoyang <[email protected]>
AuthorDate: Sat Mar 7 01:59:35 2020 +0800
[HUDI-652] Decouple HoodieReadClient and AbstractHoodieClient to break the
inheritance chain (#1372)
* Removed timeline server support
* Removed try-with-resource
---
.../org/apache/hudi/client/HoodieReadClient.java | 9 ++-
.../apache/hudi/client/TestHoodieReadClient.java | 63 ++++++++---------
.../apache/hudi/table/TestMergeOnReadTable.java | 82 +++++++++++-----------
.../hudi/table/compact/TestAsyncCompaction.java | 25 +++----
.../main/java/org/apache/hudi/DataSourceUtils.java | 3 +-
5 files changed, 88 insertions(+), 94 deletions(-)
diff --git
a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
index e08ec34..33d661b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
@@ -46,6 +45,7 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.StructType;
+import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -56,7 +56,7 @@ import scala.Tuple2;
/**
* Provides an RDD based API for accessing/filtering Hoodie tables, based on
keys.
*/
-public class HoodieReadClient<T extends HoodieRecordPayload> extends
AbstractHoodieClient {
+public class HoodieReadClient<T extends HoodieRecordPayload> implements
Serializable {
private static final Logger LOG =
LogManager.getLogger(HoodieReadClient.class);
@@ -65,9 +65,9 @@ public class HoodieReadClient<T extends HoodieRecordPayload>
extends AbstractHoo
* basepath pointing to the table. Until, then just always assume a
BloomIndex
*/
private final transient HoodieIndex<T> index;
- private final HoodieTimeline commitTimeline;
private HoodieTable hoodieTable;
private transient Option<SQLContext> sqlContextOpt;
+ private final transient JavaSparkContext jsc;
/**
* @param basePath path to Hoodie table
@@ -108,12 +108,11 @@ public class HoodieReadClient<T extends
HoodieRecordPayload> extends AbstractHoo
*/
public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService> timelineService) {
- super(jsc, clientConfig, timelineService);
+ this.jsc = jsc;
final String basePath = clientConfig.getBasePath();
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
this.hoodieTable = HoodieTable.getHoodieTable(metaClient, clientConfig,
jsc);
- this.commitTimeline =
metaClient.getCommitTimeline().filterCompletedInstants();
this.index = HoodieIndex.createIndex(clientConfig, jsc);
this.sqlContextOpt = Option.empty();
}
diff --git
a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
index c57da14..6329e08 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
@@ -96,8 +96,8 @@ public class TestHoodieReadClient extends
TestHoodieClientBase {
*/
private void testReadFilterExist(HoodieWriteConfig config,
Function3<JavaRDD<WriteStatus>, HoodieWriteClient,
JavaRDD<HoodieRecord>, String> writeFn) throws Exception {
- try (HoodieWriteClient writeClient = getHoodieWriteClient(config);
- HoodieReadClient readClient =
getHoodieReadClient(config.getBasePath());) {
+ try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
+ HoodieReadClient readClient = getHoodieReadClient(config.getBasePath());
String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
@@ -113,37 +113,36 @@ public class TestHoodieReadClient extends
TestHoodieClientBase {
// Verify there are no errors
assertNoWriteErrors(statuses);
- try (HoodieReadClient anotherReadClient =
getHoodieReadClient(config.getBasePath());) {
- filteredRDD = anotherReadClient.filterExists(recordsRDD);
- List<HoodieRecord> result = filteredRDD.collect();
- // Check results
- assertEquals(25, result.size());
-
- // check path exists for written keys
- JavaPairRDD<HoodieKey, Option<String>> keyToPathPair =
- anotherReadClient.checkExists(recordsRDD.map(r -> r.getKey()));
- JavaRDD<HoodieKey> keysWithPaths = keyToPathPair.filter(keyPath ->
keyPath._2.isPresent())
- .map(keyPath -> keyPath._1);
- assertEquals(75, keysWithPaths.count());
-
- // verify rows match inserted records
- Dataset<Row> rows = anotherReadClient.readROView(keysWithPaths, 1);
- assertEquals(75, rows.count());
-
- JavaRDD<HoodieKey> keysWithoutPaths = keyToPathPair.filter(keyPath ->
!keyPath._2.isPresent())
- .map(keyPath -> keyPath._1);
-
- try {
- anotherReadClient.readROView(keysWithoutPaths, 1);
- } catch (Exception e) {
- // data frame reader throws exception for empty records. ignore the
error.
- assertEquals(e.getClass(), AnalysisException.class);
- }
-
- // Actual tests of getPendingCompactions method are in
TestAsyncCompaction
- // This is just testing empty list
- assertEquals(0, anotherReadClient.getPendingCompactions().size());
+ HoodieReadClient anotherReadClient =
getHoodieReadClient(config.getBasePath());
+ filteredRDD = anotherReadClient.filterExists(recordsRDD);
+ List<HoodieRecord> result = filteredRDD.collect();
+ // Check results
+ assertEquals(25, result.size());
+
+ // check path exists for written keys
+ JavaPairRDD<HoodieKey, Option<String>> keyToPathPair =
+ anotherReadClient.checkExists(recordsRDD.map(r -> r.getKey()));
+ JavaRDD<HoodieKey> keysWithPaths = keyToPathPair.filter(keyPath ->
keyPath._2.isPresent())
+ .map(keyPath -> keyPath._1);
+ assertEquals(75, keysWithPaths.count());
+
+ // verify rows match inserted records
+ Dataset<Row> rows = anotherReadClient.readROView(keysWithPaths, 1);
+ assertEquals(75, rows.count());
+
+ JavaRDD<HoodieKey> keysWithoutPaths = keyToPathPair.filter(keyPath ->
!keyPath._2.isPresent())
+ .map(keyPath -> keyPath._1);
+
+ try {
+ anotherReadClient.readROView(keysWithoutPaths, 1);
+ } catch (Exception e) {
+ // data frame reader throws exception for empty records. ignore the
error.
+ assertEquals(e.getClass(), AnalysisException.class);
}
+
+ // Actual tests of getPendingCompactions method are in
TestAsyncCompaction
+ // This is just testing empty list
+ assertEquals(0, anotherReadClient.getPendingCompactions().size());
}
}
diff --git
a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index ab27920..740caf2 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -759,54 +759,54 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
List<HoodieRecord> updatedRecords =
dataGen.generateUpdates(newCommitTime, records);
JavaRDD<HoodieRecord> updatedRecordsRDD =
jsc.parallelize(updatedRecords, 1);
- try (HoodieReadClient readClient = new HoodieReadClient(jsc, config);) {
- updatedRecords = readClient.tagLocation(updatedRecordsRDD).collect();
- // Write them to corresponding avro logfiles
- HoodieTestUtils.writeRecordsToLogFiles(metaClient.getFs(),
metaClient.getBasePath(),
- HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS,
updatedRecords);
+ HoodieReadClient readClient = new HoodieReadClient(jsc, config);
+ updatedRecords = readClient.tagLocation(updatedRecordsRDD).collect();
- // Verify that all data file has one log file
- metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieTable table = HoodieTable.getHoodieTable(metaClient, config,
jsc);
- // In writeRecordsToLogFiles, no commit files are getting added, so
resetting file-system view state
- ((SyncableFileSystemView) (table.getSliceView())).reset();
-
- for (String partitionPath : dataGen.getPartitionPaths()) {
- List<FileSlice> groupedLogFiles =
-
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
- for (FileSlice fileSlice : groupedLogFiles) {
- assertEquals("There should be 1 log file written for every data
file", 1, fileSlice.getLogFiles().count());
- }
+ // Write them to corresponding avro logfiles
+ HoodieTestUtils.writeRecordsToLogFiles(metaClient.getFs(),
metaClient.getBasePath(),
+ HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS,
updatedRecords);
+
+ // Verify that all data file has one log file
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+ // In writeRecordsToLogFiles, no commit files are getting added, so
resetting file-system view state
+ ((SyncableFileSystemView) (table.getSliceView())).reset();
+
+ for (String partitionPath : dataGen.getPartitionPaths()) {
+ List<FileSlice> groupedLogFiles =
+
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
+ for (FileSlice fileSlice : groupedLogFiles) {
+ assertEquals("There should be 1 log file written for every data
file", 1, fileSlice.getLogFiles().count());
}
+ }
- // Mark 2nd delta-instant as completed
- metaClient.getActiveTimeline().createNewInstant(new
HoodieInstant(State.INFLIGHT,
- HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime));
- metaClient.getActiveTimeline().saveAsComplete(
- new HoodieInstant(State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());
+ // Mark 2nd delta-instant as completed
+ metaClient.getActiveTimeline().createNewInstant(new
HoodieInstant(State.INFLIGHT,
+ HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime));
+ metaClient.getActiveTimeline().saveAsComplete(
+ new HoodieInstant(State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());
- // Do a compaction
- String compactionInstantTime =
writeClient.scheduleCompaction(Option.empty()).get().toString();
- JavaRDD<WriteStatus> result =
writeClient.compact(compactionInstantTime);
+ // Do a compaction
+ String compactionInstantTime =
writeClient.scheduleCompaction(Option.empty()).get().toString();
+ JavaRDD<WriteStatus> result = writeClient.compact(compactionInstantTime);
- // Verify that recently written compacted data file has no log file
- metaClient = HoodieTableMetaClient.reload(metaClient);
- table = HoodieTable.getHoodieTable(metaClient, config, jsc);
- HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
-
- assertTrue("Compaction commit should be > than last insert",
HoodieTimeline
- .compareTimestamps(timeline.lastInstant().get().getTimestamp(),
newCommitTime, HoodieTimeline.GREATER));
-
- for (String partitionPath : dataGen.getPartitionPaths()) {
- List<FileSlice> groupedLogFiles =
-
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
- for (FileSlice slice : groupedLogFiles) {
- assertEquals("After compaction there should be no log files
visible on a full view", 0, slice.getLogFiles().count());
- }
- List<WriteStatus> writeStatuses = result.collect();
- assertTrue(writeStatuses.stream().anyMatch(writeStatus ->
writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)));
+ // Verify that recently written compacted data file has no log file
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+
+ assertTrue("Compaction commit should be > than last insert",
HoodieTimeline
+ .compareTimestamps(timeline.lastInstant().get().getTimestamp(),
newCommitTime, HoodieTimeline.GREATER));
+
+ for (String partitionPath : dataGen.getPartitionPaths()) {
+ List<FileSlice> groupedLogFiles =
+
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
+ for (FileSlice slice : groupedLogFiles) {
+ assertEquals("After compaction there should be no log files visible
on a full view", 0, slice.getLogFiles().count());
}
+ List<WriteStatus> writeStatuses = result.collect();
+ assertTrue(writeStatuses.stream().anyMatch(writeStatus ->
writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)));
}
}
}
diff --git
a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
index e81fa99..1a366a4 100644
---
a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
+++
b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestAsyncCompaction.java
@@ -92,9 +92,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase
{
public void testRollbackForInflightCompaction() throws Exception {
// Rollback inflight compaction
HoodieWriteConfig cfg = getConfig(false);
- try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);
- HoodieReadClient readClient =
getHoodieReadClient(cfg.getBasePath());) {
-
+ try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
String secondInstantTime = "004";
String compactionInstantTime = "005";
@@ -155,9 +154,8 @@ public class TestAsyncCompaction extends
TestHoodieClientBase {
int numRecs = 2000;
- try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);
- HoodieReadClient readClient =
getHoodieReadClient(cfg.getBasePath());) {
-
+ try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime,
numRecs);
records = runNextDeltaCommits(client, readClient,
Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());
@@ -197,9 +195,8 @@ public class TestAsyncCompaction extends
TestHoodieClientBase {
public void testInflightCompaction() throws Exception {
// There is inflight compaction. Subsequent compaction run must work
correctly
HoodieWriteConfig cfg = getConfig(true);
- try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);
- HoodieReadClient readClient =
getHoodieReadClient(cfg.getBasePath());) {
-
+ try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
String secondInstantTime = "004";
String compactionInstantTime = "005";
@@ -351,9 +348,8 @@ public class TestAsyncCompaction extends
TestHoodieClientBase {
public void testCompactionAfterTwoDeltaCommits() throws Exception {
// No Delta Commits after compaction request
HoodieWriteConfig cfg = getConfig(true);
- try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);
- HoodieReadClient readClient =
getHoodieReadClient(cfg.getBasePath());) {
-
+ try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
String secondInstantTime = "004";
String compactionInstantTime = "005";
@@ -373,9 +369,8 @@ public class TestAsyncCompaction extends
TestHoodieClientBase {
public void testInterleavedCompaction() throws Exception {
// Case: Two delta commits before and after compaction schedule
HoodieWriteConfig cfg = getConfig(true);
- try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);
- HoodieReadClient readClient =
getHoodieReadClient(cfg.getBasePath());) {
-
+ try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
String secondInstantTime = "004";
String compactionInstantTime = "005";
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index a2dfe02..6a4ad03 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -223,7 +223,8 @@ public class DataSourceUtils {
@SuppressWarnings("unchecked")
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc,
JavaRDD<HoodieRecord> incomingHoodieRecords,
HoodieWriteConfig
writeConfig, Option<EmbeddedTimelineService> timelineService) {
- try (HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig,
timelineService)) {
+ try {
+ HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig,
timelineService);
return client.tagLocation(incomingHoodieRecords)
.filter(r -> !((HoodieRecord<HoodieRecordPayload>)
r).isCurrentLocationKnown());
} catch (TableNotFoundException e) {