This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5823c1e HUDI-138 - Meta Files handling also need to support
consistency guard
5823c1e is described below
commit 5823c1ebd7e12a48805eed05532edfd25b69c6fe
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Thu Jun 20 18:05:01 2019 -0700
HUDI-138 - Meta Files handling also need to support consistency guard
---
.../main/java/com/uber/hoodie/cli/HoodieCLI.java | 26 +++-
.../uber/hoodie/cli/commands/CleansCommand.java | 6 +-
.../uber/hoodie/cli/commands/CommitsCommand.java | 5 +-
.../uber/hoodie/cli/commands/DatasetsCommand.java | 27 +++-
.../hoodie/cli/commands/SavepointsCommand.java | 6 +-
.../java/com/uber/hoodie/AbstractHoodieClient.java | 6 +
.../com/uber/hoodie/CompactionAdminClient.java | 6 +-
.../java/com/uber/hoodie/HoodieWriteClient.java | 59 ++++----
.../com/uber/hoodie/client/utils/ClientUtils.java | 39 ++++++
.../com/uber/hoodie/config/HoodieWriteConfig.java | 49 +++----
.../java/com/uber/hoodie/io/HoodieReadHandle.java | 1 -
.../java/com/uber/hoodie/io/HoodieWriteHandle.java | 15 +-
.../java/com/uber/hoodie/table/HoodieTable.java | 11 +-
.../src/test/java/com/uber/hoodie/TestCleaner.java | 8 +-
.../java/com/uber/hoodie/TestConsistencyGuard.java | 21 ++-
.../java/com/uber/hoodie/TestHoodieClientBase.java | 10 +-
.../TestHoodieClientOnCopyOnWriteStorage.java | 10 +-
.../common/io/storage/HoodieWrapperFileSystem.java | 151 +++++++++++++++++----
.../hoodie/common/table/HoodieTableMetaClient.java | 38 +++++-
.../hoodie/common/util/ConsistencyGuardConfig.java | 121 +++++++++++++++++
.../common/util/FailSafeConsistencyGuard.java | 32 ++---
21 files changed, 481 insertions(+), 166 deletions(-)
diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/HoodieCLI.java
b/hoodie-cli/src/main/java/com/uber/hoodie/cli/HoodieCLI.java
index 67a999a..ba9c023 100644
--- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/HoodieCLI.java
+++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/HoodieCLI.java
@@ -19,6 +19,7 @@
package com.uber.hoodie.cli;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
+import com.uber.hoodie.common.util.ConsistencyGuardConfig;
import com.uber.hoodie.common.util.FSUtils;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
@@ -27,8 +28,10 @@ import org.apache.hadoop.fs.FileSystem;
public class HoodieCLI {
public static Configuration conf;
+ public static ConsistencyGuardConfig consistencyGuardConfig =
ConsistencyGuardConfig.newBuilder().build();
public static FileSystem fs;
public static CLIState state = CLIState.INIT;
+ public static String basePath;
public static HoodieTableMetaClient tableMetadata;
public static HoodieTableMetaClient syncTableMetadata;
@@ -37,6 +40,18 @@ public class HoodieCLI {
INIT, DATASET, SYNC
}
+ public static void setConsistencyGuardConfig(ConsistencyGuardConfig config) {
+ consistencyGuardConfig = config;
+ }
+
+ private static void setTableMetaClient(HoodieTableMetaClient tableMetadata) {
+ HoodieCLI.tableMetadata = tableMetadata;
+ }
+
+ private static void setBasePath(String basePath) {
+ HoodieCLI.basePath = basePath;
+ }
+
public static boolean initConf() {
if (HoodieCLI.conf == null) {
HoodieCLI.conf = FSUtils.prepareHadoopConf(new Configuration());
@@ -47,11 +62,16 @@ public class HoodieCLI {
public static void initFS(boolean force) throws IOException {
if (fs == null || force) {
- fs = FileSystem.get(conf);
+ fs = (tableMetadata != null) ? tableMetadata.getFs() :
FileSystem.get(conf);
}
}
- public static void setTableMetadata(HoodieTableMetaClient tableMetadata) {
- HoodieCLI.tableMetadata = tableMetadata;
+ public static void refreshTableMetadata() {
+ setTableMetaClient(new HoodieTableMetaClient(HoodieCLI.conf, basePath,
false, HoodieCLI.consistencyGuardConfig));
+ }
+
+ public static void connectTo(String basePath) {
+ setBasePath(basePath);
+ refreshTableMetadata();
}
}
diff --git
a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CleansCommand.java
b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CleansCommand.java
index 79b271e..c0f7129 100644
--- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CleansCommand.java
+++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CleansCommand.java
@@ -23,7 +23,6 @@ import
com.uber.hoodie.avro.model.HoodieCleanPartitionMetadata;
import com.uber.hoodie.cli.HoodieCLI;
import com.uber.hoodie.cli.HoodiePrintHelper;
import com.uber.hoodie.cli.TableHeader;
-import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
@@ -91,9 +90,8 @@ public class CleansCommand implements CommandMarker {
@CliCommand(value = "cleans refresh", help = "Refresh the commits")
public String refreshCleans() throws IOException {
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(HoodieCLI.conf,
HoodieCLI.tableMetadata.getBasePath());
- HoodieCLI.setTableMetadata(metadata);
- return "Metadata for table " + metadata.getTableConfig().getTableName() +
" refreshed.";
+ HoodieCLI.refreshTableMetadata();
+ return "Metadata for table " +
HoodieCLI.tableMetadata.getTableConfig().getTableName() + " refreshed.";
}
@CliCommand(value = "clean showpartitions", help = "Show partition level
details of a clean")
diff --git
a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CommitsCommand.java
b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CommitsCommand.java
index 641cb80..2360503 100644
--- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CommitsCommand.java
+++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CommitsCommand.java
@@ -115,9 +115,8 @@ public class CommitsCommand implements CommandMarker {
@CliCommand(value = "commits refresh", help = "Refresh the commits")
public String refreshCommits() throws IOException {
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(HoodieCLI.conf,
HoodieCLI.tableMetadata.getBasePath());
- HoodieCLI.setTableMetadata(metadata);
- return "Metadata for table " + metadata.getTableConfig().getTableName() +
" refreshed.";
+ HoodieCLI.refreshTableMetadata();
+ return "Metadata for table " +
HoodieCLI.tableMetadata.getTableConfig().getTableName() + " refreshed.";
}
@CliCommand(value = "commit rollback", help = "Rollback a commit")
diff --git
a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/DatasetsCommand.java
b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/DatasetsCommand.java
index f001f5c..257091e 100644
--- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/DatasetsCommand.java
+++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/DatasetsCommand.java
@@ -23,6 +23,7 @@ import com.uber.hoodie.cli.HoodiePrintHelper;
import com.uber.hoodie.cli.TableHeader;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
+import com.uber.hoodie.common.util.ConsistencyGuardConfig;
import com.uber.hoodie.exception.DatasetNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
@@ -39,11 +40,25 @@ public class DatasetsCommand implements CommandMarker {
@CliCommand(value = "connect", help = "Connect to a hoodie dataset")
public String connect(
- @CliOption(key = {"path"}, mandatory = true, help = "Base Path of the
dataset") final String path)
- throws IOException {
- boolean initialized = HoodieCLI.initConf();
- HoodieCLI.initFS(initialized);
- HoodieCLI.setTableMetadata(new HoodieTableMetaClient(HoodieCLI.conf,
path));
+ @CliOption(key = {"path"}, mandatory = true, help = "Base Path of the
dataset") final String path,
+ @CliOption(key = {"eventuallyConsistent"}, mandatory = false,
unspecifiedDefaultValue = "false",
+ help = "Enable eventual consistency") final boolean
eventuallyConsistent,
+ @CliOption(key = {"initialCheckIntervalMs"}, mandatory = false,
unspecifiedDefaultValue = "2000",
+ help = "Initial wait time for eventual consistency") final Integer
initialConsistencyIntervalMs,
+ @CliOption(key = {"maxCheckIntervalMs"}, mandatory = false,
unspecifiedDefaultValue = "300000",
+ help = "Max wait time for eventual consistency") final Integer
maxConsistencyIntervalMs,
+ @CliOption(key = {"maxCheckIntervalMs"}, mandatory = false,
unspecifiedDefaultValue = "7",
+ help = "Max checks for eventual consistency") final Integer
maxConsistencyChecks) throws IOException {
+ HoodieCLI.setConsistencyGuardConfig(
+ ConsistencyGuardConfig.newBuilder()
+ .withConsistencyCheckEnabled(eventuallyConsistent)
+
.withInitialConsistencyCheckIntervalMs(initialConsistencyIntervalMs)
+ .withMaxConsistencyCheckIntervalMs(maxConsistencyIntervalMs)
+ .withMaxConsistencyChecks(maxConsistencyChecks)
+ .build());
+ HoodieCLI.initConf();
+ HoodieCLI.connectTo(path);
+ HoodieCLI.initFS(true);
HoodieCLI.state = HoodieCLI.CLIState.DATASET;
return "Metadata for table " +
HoodieCLI.tableMetadata.getTableConfig().getTableName() + " loaded";
}
@@ -85,7 +100,7 @@ public class DatasetsCommand implements CommandMarker {
HoodieTableMetaClient.initTableType(HoodieCLI.conf, path, tableType, name,
payloadClass);
// Now connect to ensure loading works
- return connect(path);
+ return connect(path, false, 0, 0, 0);
}
@CliAvailabilityIndicator({"desc"})
diff --git
a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SavepointsCommand.java
b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SavepointsCommand.java
index 82358aa..8709469 100644
---
a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SavepointsCommand.java
+++
b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SavepointsCommand.java
@@ -23,7 +23,6 @@ import com.uber.hoodie.cli.HoodieCLI;
import com.uber.hoodie.cli.HoodiePrintHelper;
import com.uber.hoodie.cli.utils.InputStreamConsumer;
import com.uber.hoodie.cli.utils.SparkUtil;
-import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
@@ -133,9 +132,8 @@ public class SavepointsCommand implements CommandMarker {
@CliCommand(value = "savepoints refresh", help = "Refresh the savepoints")
public String refreshMetaClient() throws IOException {
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(HoodieCLI.conf,
HoodieCLI.tableMetadata.getBasePath());
- HoodieCLI.setTableMetadata(metadata);
- return "Metadata for table " + metadata.getTableConfig().getTableName() +
" refreshed.";
+ HoodieCLI.refreshTableMetadata();
+ return "Metadata for table " +
HoodieCLI.tableMetadata.getTableConfig().getTableName() + " refreshed.";
}
private static HoodieWriteClient createHoodieClient(JavaSparkContext jsc,
String basePath) throws Exception {
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/AbstractHoodieClient.java
b/hoodie-client/src/main/java/com/uber/hoodie/AbstractHoodieClient.java
index 5aa051c..c03bd21 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/AbstractHoodieClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/AbstractHoodieClient.java
@@ -19,6 +19,8 @@
package com.uber.hoodie;
import com.uber.hoodie.client.embedded.EmbeddedTimelineService;
+import com.uber.hoodie.client.utils.ClientUtils;
+import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import java.io.IOException;
@@ -117,4 +119,8 @@ public abstract class AbstractHoodieClient implements
Serializable {
public Optional<EmbeddedTimelineService> getTimelineServer() {
return timelineServer;
}
+
+ protected HoodieTableMetaClient createMetaClient(boolean
loadActiveTimelineOnLoad) {
+ return ClientUtils.createMetaClient(jsc, config, loadActiveTimelineOnLoad);
+ }
}
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
index d2b007f..03bec59 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
@@ -114,7 +114,7 @@ public class CompactionAdminClient extends
AbstractHoodieClient {
*/
public List<RenameOpResult> unscheduleCompactionPlan(
String compactionInstant, boolean skipValidation, int parallelism,
boolean dryRun) throws Exception {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTableMetaClient metaClient = createMetaClient(false);
List<Pair<HoodieLogFile, HoodieLogFile>> renameActions =
getRenamingActionsForUnschedulingCompactionPlan(metaClient,
compactionInstant, parallelism,
Optional.absent(), skipValidation);
@@ -156,7 +156,7 @@ public class CompactionAdminClient extends
AbstractHoodieClient {
*/
public List<RenameOpResult> unscheduleCompactionFileId(HoodieFileGroupId
fgId,
boolean skipValidation, boolean dryRun) throws Exception {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTableMetaClient metaClient = createMetaClient(false);
List<Pair<HoodieLogFile, HoodieLogFile>> renameActions =
getRenamingActionsForUnschedulingCompactionForFileId(metaClient, fgId,
Optional.absent(), skipValidation);
@@ -198,7 +198,7 @@ public class CompactionAdminClient extends
AbstractHoodieClient {
*/
public List<RenameOpResult> repairCompaction(String compactionInstant,
int parallelism, boolean dryRun) throws Exception {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTableMetaClient metaClient = createMetaClient(false);
List<ValidationOpResult> validationResults =
validateCompactionPlan(metaClient, compactionInstant, parallelism);
List<ValidationOpResult> failed = validationResults.stream()
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
index a9b548b..3c8b36c 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
@@ -76,7 +76,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
@@ -153,7 +152,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>>
hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config, jsc);
+ createMetaClient(true), config, jsc);
JavaRDD<HoodieRecord<T>> recordsWithLocation =
index.tagLocation(hoodieRecords, jsc, table);
return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown());
@@ -471,9 +470,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, jsc,
table);
// Trigger the insert and collect statuses
statuses = statuses.persist(config.getWriteStatusStorageLevel());
- commitOnAutoCommit(commitTime, statuses,
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true)
- .getCommitActionType());
+ commitOnAutoCommit(commitTime, statuses,
table.getMetaClient().getCommitActionType());
return statuses;
}
@@ -496,7 +493,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
*/
public boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses,
Optional<Map<String, String>> extraMetadata) {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true);
+ HoodieTableMetaClient metaClient = createMetaClient(false);
return commit(commitTime, writeStatuses, extraMetadata,
metaClient.getCommitActionType());
}
@@ -506,7 +503,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
logger.info("Commiting " + commitTime);
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config, jsc);
+ createMetaClient(true), config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
@@ -536,7 +533,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
// We cannot have unbounded commit files. Archive commits if we have to
archive
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(config,
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true));
+ createMetaClient(true));
archiveLog.archiveIfRequired(jsc);
if (config.isAutoClean()) {
// Call clean to cleanup if there is anything to cleanup after the
commit,
@@ -580,7 +577,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
*/
public boolean savepoint(String user, String comment) {
HoodieTable<T> table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config, jsc);
+ createMetaClient(true), config, jsc);
if (table.getCompletedCommitsTimeline().empty()) {
throw new HoodieSavepointException("Could not savepoint. Commit timeline
is empty");
}
@@ -610,7 +607,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
*/
public boolean savepoint(String commitTime, String user, String comment) {
HoodieTable<T> table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config, jsc);
+ createMetaClient(true), config, jsc);
if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ)
{
throw new UnsupportedOperationException("Savepointing is not supported
or MergeOnRead table types");
}
@@ -674,7 +671,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
*/
public void deleteSavepoint(String savepointTime) {
HoodieTable<T> table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config, jsc);
+ createMetaClient(true), config, jsc);
if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ)
{
throw new UnsupportedOperationException("Savepointing is not supported
or MergeOnRead table types");
}
@@ -705,7 +702,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
*/
private void deleteRequestedCompaction(String compactionTime) {
HoodieTable<T> table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config, jsc);
+ createMetaClient(true), config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieInstant compactionRequestedInstant =
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION,
compactionTime);
@@ -734,7 +731,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
*/
public boolean rollbackToSavepoint(String savepointTime) {
HoodieTable<T> table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config, jsc);
+ createMetaClient(true), config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
// Rollback to savepoint is expected to be a manual operation and no
concurrent write or compaction is expected
@@ -788,7 +785,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config, jsc);
+ createMetaClient(true), config, jsc);
// Get all the commits on the timeline after the provided commit time
List<HoodieInstant> instantsToRollback =
table.getActiveTimeline().getCommitsAndCompactionTimeline().getInstants()
.filter(instant ->
HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), instantTime))
@@ -848,7 +845,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
private List<HoodieRollbackStat> doRollbackAndGetStats(final String
commitToRollback) throws
IOException {
HoodieTable<T> table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config, jsc);
+ createMetaClient(true), config, jsc);
HoodieTimeline inflightCommitTimeline = table.getInflightCommitTimeline();
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
// Check if any of the commits is a savepoint - do not allow rollback on
those commits
@@ -899,7 +896,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
private void finishRollback(final Timer.Context context,
List<HoodieRollbackStat> rollbackStats,
List<String> commitsToRollback, final String startRollbackTime) throws
IOException {
HoodieTable<T> table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config, jsc);
+ createMetaClient(true), config, jsc);
Optional<Long> durationInMs = Optional.empty();
Long numFilesDeleted = rollbackStats.stream().mapToLong(stat ->
stat.getSuccessDeleteFiles().size()).sum();
if (context != null) {
@@ -925,7 +922,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
private void finishRestore(final Timer.Context context, Map<String,
List<HoodieRollbackStat>> commitToStats,
List<String> commitsToRollback, final String startRestoreTime, final
String restoreToInstant) throws IOException {
HoodieTable<T> table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config, jsc);
+ createMetaClient(true), config, jsc);
Optional<Long> durationInMs = Optional.empty();
Long numFilesDeleted = 0L;
for (Map.Entry<String, List<HoodieRollbackStat>> commitToStat :
commitToStats.entrySet()) {
@@ -1001,7 +998,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config, jsc);
+ createMetaClient(true), config, jsc);
List<HoodieCleanStat> cleanStats = table.clean(jsc);
if (cleanStats.isEmpty()) {
@@ -1053,7 +1050,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
rollbackInflightCommits();
}
logger.info("Generate a new instant time " + instantTime);
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath());
+ HoodieTableMetaClient metaClient = createMetaClient(true);
// if there are pending compactions, their instantTime must not be greater
than that of this instant time
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending
-> {
Preconditions.checkArgument(
@@ -1086,8 +1083,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
*/
public boolean scheduleCompactionAtInstant(String instantTime,
Optional<Map<String, String>> extraMetadata)
throws IOException {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(),
- config.getBasePath(), true);
+ HoodieTableMetaClient metaClient = createMetaClient(true);
// if there are inflight writes, their instantTime must not be less than
that of compaction instant time
metaClient.getCommitsTimeline().filterInflightsExcludingCompaction().firstInstant().ifPresent(earliestInflight
-> {
Preconditions.checkArgument(
@@ -1130,8 +1126,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
*/
public void commitCompaction(String compactionInstantTime,
JavaRDD<WriteStatus> writeStatuses,
Optional<Map<String, String>> extraMetadata) throws IOException {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(),
- config.getBasePath(), true);
+ HoodieTableMetaClient metaClient = createMetaClient(true);
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(
@@ -1178,7 +1173,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
*/
private void rollbackInflightCommits() {
HoodieTable<T> table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config, jsc);
+ createMetaClient(true), config, jsc);
HoodieTimeline inflightTimeline =
table.getMetaClient().getCommitsTimeline().filterInflightsExcludingCompaction();
List<String> commits =
inflightTimeline.getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
@@ -1190,11 +1185,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
private HoodieTable getTableAndInitCtx(JavaRDD<HoodieRecord<T>> records) {
// Create a Hoodie table which encapsulated the commits and files visible
- HoodieTable table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(
- // Clone Configuration here. Otherwise we could see
ConcurrentModificationException (race) in multi-threaded
- // execution (HoodieDeltaStreamer) when Configuration gets
serialized by Spark.
- new Configuration(jsc.hadoopConfiguration()),
config.getBasePath(), true), config, jsc);
+ HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true),
config, jsc);
if
(table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION))
{
writeContext = metrics.getCommitCtx();
} else {
@@ -1214,8 +1205,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
*/
private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean
autoCommit) throws IOException {
// Create a Hoodie table which encapsulated the commits and files visible
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(),
- config.getBasePath(), true);
+ HoodieTableMetaClient metaClient = createMetaClient(true);
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieTimeline pendingCompactionTimeline =
metaClient.getActiveTimeline().filterPendingCompactionTimeline();
HoodieInstant inflightInstant =
HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
@@ -1223,7 +1213,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
//inflight compaction - Needs to rollback first deleting new parquet
files before we run compaction.
rollbackInflightCompaction(inflightInstant, table);
// refresh table
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true);
+ metaClient = createMetaClient(true);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
pendingCompactionTimeline =
metaClient.getActiveTimeline().filterPendingCompactionTimeline();
}
@@ -1253,8 +1243,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
activeTimeline.transitionCompactionRequestedToInflight(compactionInstant);
compactionTimer = metrics.getCompactionCtx();
// Create a Hoodie table which encapsulated the commits and files visible
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(),
- config.getBasePath(), true);
+ HoodieTableMetaClient metaClient = createMetaClient(true);
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
JavaRDD<WriteStatus> statuses = table.compact(jsc,
compactionInstant.getTimestamp(), compactionPlan);
// Force compaction action
@@ -1383,7 +1372,7 @@ public class HoodieWriteClient<T extends
HoodieRecordPayload> extends AbstractHo
try {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config, jsc);
+ createMetaClient(true), config, jsc);
// 0. All of the rolling stat management is only done by the DELTA
commit for MOR and COMMIT for COW other wise
// there may be race conditions
HoodieRollingStatMetadata rollingStatMetadata = new
HoodieRollingStatMetadata(actionType);
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/client/utils/ClientUtils.java
b/hoodie-client/src/main/java/com/uber/hoodie/client/utils/ClientUtils.java
new file mode 100644
index 0000000..e9859f9
--- /dev/null
+++ b/hoodie-client/src/main/java/com/uber/hoodie/client/utils/ClientUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.client.utils;
+
+import com.uber.hoodie.common.table.HoodieTableMetaClient;
+import com.uber.hoodie.config.HoodieWriteConfig;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class ClientUtils {
+
+ /**
+ * Create Consistency Aware MetaClient
+ *
+ * @param jsc JavaSparkContext
+ * @param config HoodieWriteConfig
+ * @param loadActiveTimelineOnLoad early loading of timeline
+ */
+ public static HoodieTableMetaClient createMetaClient(JavaSparkContext jsc,
HoodieWriteConfig config,
+ boolean loadActiveTimelineOnLoad) {
+ return new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), loadActiveTimelineOnLoad,
+ config.getConsistencyGuardConfig());
+ }
+}
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
index 2d522ad..e78c358 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieCleaningPolicy;
import com.uber.hoodie.common.table.view.FileSystemViewStorageConfig;
+import com.uber.hoodie.common.util.ConsistencyGuardConfig;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.io.compact.strategy.CompactionStrategy;
@@ -66,10 +67,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS =
WriteStatus.class.getName();
private static final String FINALIZE_WRITE_PARALLELISM =
"hoodie.finalize.write.parallelism";
private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM =
DEFAULT_PARALLELISM;
- private static final String CONSISTENCY_CHECK_ENABLED_PROP =
"hoodie.consistency.check.enabled";
- private static final String DEFAULT_CONSISTENCY_CHECK_ENABLED = "false";
+
private static final String EMBEDDED_TIMELINE_SERVER_ENABLED =
"hoodie.embed.timeline.server";
private static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED =
"false";
+
private static final String FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP =
"hoodie.fail.on.timeline.archiving";
private static final String DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED =
"true";
// time between successive attempts to ensure written data's metadata is
consistent on storage
@@ -85,6 +86,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String MAX_CONSISTENCY_CHECKS_PROP =
"hoodie.consistency.check.max_checks";
private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
+ private ConsistencyGuardConfig consistencyGuardConfig;
+
// Hoodie Write Client transparently rewrites File System View config when
embedded mode is enabled
// We keep track of original config and rewritten config
private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig;
@@ -94,6 +97,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
super(props);
Properties newProps = new Properties();
newProps.putAll(props);
+ this.consistencyGuardConfig =
ConsistencyGuardConfig.newBuilder().fromProperties(newProps).build();
this.clientSpecifiedViewStorageConfig =
FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build();
this.viewStorageConfig = clientSpecifiedViewStorageConfig;
}
@@ -162,10 +166,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig
{
return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM));
}
- public boolean isConsistencyCheckEnabled() {
- return
Boolean.parseBoolean(props.getProperty(CONSISTENCY_CHECK_ENABLED_PROP));
- }
-
public boolean isEmbeddedTimelineServerEnabled() {
return
Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED));
}
@@ -495,6 +495,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig
{
return
Double.valueOf(props.getProperty(HoodieMemoryConfig.WRITESTATUS_FAILURE_FRACTION_PROP));
}
+ public ConsistencyGuardConfig getConsistencyGuardConfig() {
+ return consistencyGuardConfig;
+ }
+
+ public void setConsistencyGuardConfig(ConsistencyGuardConfig
consistencyGuardConfig) {
+ this.consistencyGuardConfig = consistencyGuardConfig;
+ }
+
public FileSystemViewStorageConfig getViewStorageConfig() {
return viewStorageConfig;
}
@@ -520,6 +528,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private boolean isMetricsConfigSet = false;
private boolean isMemoryConfigSet = false;
private boolean isViewConfigSet = false;
+ private boolean isConsistencyGuardSet = false;
public Builder fromFile(File propertiesFile) throws IOException {
FileReader reader = new FileReader(propertiesFile);
@@ -639,13 +648,14 @@ public class HoodieWriteConfig extends
DefaultHoodieConfig {
return this;
}
- public Builder withFinalizeWriteParallelism(int parallelism) {
- props.setProperty(FINALIZE_WRITE_PARALLELISM,
String.valueOf(parallelism));
+ public Builder withConsistencyGuardConfig(ConsistencyGuardConfig
consistencyGuardConfig) {
+ props.putAll(consistencyGuardConfig.getProps());
+ isConsistencyGuardSet = true;
return this;
}
- public Builder withConsistencyCheckEnabled(boolean enabled) {
- props.setProperty(CONSISTENCY_CHECK_ENABLED_PROP,
String.valueOf(enabled));
+ public Builder withFinalizeWriteParallelism(int parallelism) {
+ props.setProperty(FINALIZE_WRITE_PARALLELISM,
String.valueOf(parallelism));
return this;
}
@@ -654,21 +664,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig
{
return this;
}
- public Builder withInitialConsistencyCheckIntervalMs(int initialIntevalMs)
{
- props.setProperty(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP,
String.valueOf(initialIntevalMs));
- return this;
- }
-
- public Builder withMaxConsistencyCheckIntervalMs(int maxIntervalMs) {
- props.setProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP,
String.valueOf(maxIntervalMs));
- return this;
- }
-
- public Builder withMaxConsistencyChecks(int maxConsistencyChecks) {
- props.setProperty(MAX_CONSISTENCY_CHECKS_PROP,
String.valueOf(maxConsistencyChecks));
- return this;
- }
-
public HoodieWriteConfig build() {
// Check for mandatory properties
setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM),
INSERT_PARALLELISM,
@@ -691,8 +686,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
HOODIE_WRITE_STATUS_CLASS_PROP, DEFAULT_HOODIE_WRITE_STATUS_CLASS);
setDefaultOnCondition(props,
!props.containsKey(FINALIZE_WRITE_PARALLELISM),
FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM);
- setDefaultOnCondition(props,
!props.containsKey(CONSISTENCY_CHECK_ENABLED_PROP),
- CONSISTENCY_CHECK_ENABLED_PROP, DEFAULT_CONSISTENCY_CHECK_ENABLED);
setDefaultOnCondition(props,
!props.containsKey(EMBEDDED_TIMELINE_SERVER_ENABLED),
EMBEDDED_TIMELINE_SERVER_ENABLED,
DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED);
setDefaultOnCondition(props,
!props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
@@ -717,6 +710,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
HoodieMemoryConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isViewConfigSet,
FileSystemViewStorageConfig.newBuilder().fromProperties(props).build());
+ setDefaultOnCondition(props, !isConsistencyGuardSet,
+ ConsistencyGuardConfig.newBuilder().fromProperties(props).build());
// Build WriteConfig at the end
HoodieWriteConfig config = new HoodieWriteConfig(props);
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java
index e0d3323..e4b1838 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java
@@ -43,7 +43,6 @@ public abstract class HoodieReadHandle<T extends
HoodieRecordPayload> extends Ho
return hoodieTable.getMetaClient().getFs();
}
-
public Pair<String, String> getPartitionPathFilePair() {
return partitionPathFilePair;
}
diff --git
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java
index 1adeecc..aa3eec2 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java
@@ -19,14 +19,11 @@
package com.uber.hoodie.io;
import com.uber.hoodie.WriteStatus;
-import com.uber.hoodie.common.io.storage.HoodieWrapperFileSystem;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.util.FSUtils;
-import com.uber.hoodie.common.util.FailSafeConsistencyGuard;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.HoodieTimer;
-import com.uber.hoodie.common.util.NoOpConsistencyGuard;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieException;
@@ -68,13 +65,6 @@ public abstract class HoodieWriteHandle<T extends
HoodieRecordPayload> extends H
config.getWriteStatusFailureFraction());
}
- private static FileSystem getFileSystem(HoodieTable hoodieTable,
HoodieWriteConfig config) {
- return new HoodieWrapperFileSystem(hoodieTable.getMetaClient().getFs(),
config.isConsistencyCheckEnabled()
- ? new FailSafeConsistencyGuard(hoodieTable.getMetaClient().getFs(),
- config.getMaxConsistencyChecks(),
config.getInitialConsistencyCheckIntervalMs(),
- config.getMaxConsistencyCheckIntervalMs()) : new
NoOpConsistencyGuard());
- }
-
/**
* Generate a write token based on the currently running spark task and its
place in the spark dag.
*/
@@ -175,9 +165,6 @@ public abstract class HoodieWriteHandle<T extends
HoodieRecordPayload> extends H
@Override
protected FileSystem getFileSystem() {
- return new HoodieWrapperFileSystem(hoodieTable.getMetaClient().getFs(),
config.isConsistencyCheckEnabled()
- ? new FailSafeConsistencyGuard(hoodieTable.getMetaClient().getFs(),
- config.getMaxConsistencyChecks(),
config.getInitialConsistencyCheckIntervalMs(),
- config.getMaxConsistencyCheckIntervalMs()) : new
NoOpConsistencyGuard());
+ return hoodieTable.getMetaClient().getFs();
}
}
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java
b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java
index 11f545a..7fd9a29 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java
@@ -21,6 +21,7 @@ package com.uber.hoodie.table;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.avro.model.HoodieSavepointMetadata;
+import com.uber.hoodie.client.utils.ClientUtils;
import com.uber.hoodie.common.HoodieCleanStat;
import com.uber.hoodie.common.HoodieRollbackStat;
import com.uber.hoodie.common.SerializableConfiguration;
@@ -83,7 +84,7 @@ public abstract class HoodieTable<T extends
HoodieRecordPayload> implements Seri
this.hadoopConfiguration = new
SerializableConfiguration(jsc.hadoopConfiguration());
this.viewManager = FileSystemViewManager.createViewManager(
new SerializableConfiguration(jsc.hadoopConfiguration()),
config.getViewStorageConfig());
- this.metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true);
+ this.metaClient = ClientUtils.createMetaClient(jsc, config, true);
this.index = HoodieIndex.createIndex(config, jsc);
}
@@ -291,7 +292,7 @@ public abstract class HoodieTable<T extends
HoodieRecordPayload> implements Seri
*/
public void finalizeWrite(JavaSparkContext jsc, String instantTs,
List<HoodieWriteStat> stats)
throws HoodieIOException {
- cleanFailedWrites(jsc, instantTs, stats,
config.isConsistencyCheckEnabled());
+ cleanFailedWrites(jsc, instantTs, stats,
config.getConsistencyGuardConfig().isConsistencyCheckEnabled());
}
/**
@@ -412,7 +413,7 @@ public abstract class HoodieTable<T extends
HoodieRecordPayload> implements Seri
private boolean waitForCondition(String partitionPath, Stream<Pair<String,
String>> partitionFilePaths,
FileVisibility visibility) {
- final FileSystem fileSystem = metaClient.getFs();
+ final FileSystem fileSystem = metaClient.getRawFs();
List<String> fileList =
partitionFilePaths.map(Pair::getValue).collect(Collectors.toList());
try {
getFailSafeConsistencyGuard(fileSystem).waitTill(partitionPath,
fileList, visibility);
@@ -424,8 +425,6 @@ public abstract class HoodieTable<T extends
HoodieRecordPayload> implements Seri
}
private ConsistencyGuard getFailSafeConsistencyGuard(FileSystem fileSystem) {
- return new FailSafeConsistencyGuard(fileSystem,
config.getMaxConsistencyChecks(),
- config.getInitialConsistencyCheckIntervalMs(),
- config.getMaxConsistencyCheckIntervalMs());
+ return new FailSafeConsistencyGuard(fileSystem,
config.getConsistencyGuardConfig());
}
}
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
index 7798805..ffb6daf 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
@@ -48,6 +48,7 @@ import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
import com.uber.hoodie.common.util.AvroUtils;
import com.uber.hoodie.common.util.CompactionUtils;
+import com.uber.hoodie.common.util.ConsistencyGuardConfig;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.config.HoodieCompactionConfig;
@@ -195,7 +196,8 @@ public class TestCleaner extends TestHoodieClientBase {
HoodieCompactionConfig.newBuilder().withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS)
.retainFileVersions(maxVersions).build())
.withParallelism(1, 1).withBulkInsertParallelism(1)
- .withFinalizeWriteParallelism(1).withConsistencyCheckEnabled(true)
+ .withFinalizeWriteParallelism(1)
+
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.build();
HoodieWriteClient client = getHoodieWriteClient(cfg);
@@ -357,7 +359,9 @@ public class TestCleaner extends TestHoodieClientBase {
HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainCommits(maxCommits).build())
.withParallelism(1, 1).withBulkInsertParallelism(1)
-
.withFinalizeWriteParallelism(1).withConsistencyCheckEnabled(true).build();
+ .withFinalizeWriteParallelism(1)
+
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+ .build();
HoodieWriteClient client = getHoodieWriteClient(cfg);
final Function2<List<HoodieRecord>, String, Integer>
recordInsertGenWrappedFunction =
diff --git
a/hoodie-client/src/test/java/com/uber/hoodie/TestConsistencyGuard.java
b/hoodie-client/src/test/java/com/uber/hoodie/TestConsistencyGuard.java
index 64d97f8..7712c1c 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestConsistencyGuard.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestConsistencyGuard.java
@@ -20,6 +20,7 @@ package com.uber.hoodie;
import com.uber.hoodie.common.HoodieClientTestUtils;
import com.uber.hoodie.common.util.ConsistencyGuard;
+import com.uber.hoodie.common.util.ConsistencyGuardConfig;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.FailSafeConsistencyGuard;
import java.io.IOException;
@@ -58,7 +59,7 @@ public class TestConsistencyGuard {
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000",
"f2");
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000",
"f3");
- ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 1, 1000, 1000);
+ ConsistencyGuard passing = new FailSafeConsistencyGuard(fs,
getConsistencyGuardConfig(1, 1000, 1000));
passing.waitTillFileAppears(new Path(basePath +
"/partition/path/f1_1-0-1_000.parquet"));
passing.waitTillFileAppears(new Path(basePath +
"/partition/path/f2_1-0-1_000.parquet"));
passing.waitTillAllFilesAppear(basePath + "/partition/path",
@@ -77,7 +78,7 @@ public class TestConsistencyGuard {
@Test(expected = TimeoutException.class)
public void testCheckFailingAppear() throws Exception {
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000",
"f1");
- ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 3, 10, 10);
+ ConsistencyGuard passing = new FailSafeConsistencyGuard(fs,
getConsistencyGuardConfig());
passing.waitTillAllFilesAppear(basePath + "/partition/path",
Arrays.asList(basePath + "/partition/path/f1_1-0-2_000.parquet",
basePath + "/partition/path/f2_1-0-2_000.parquet"));
@@ -87,14 +88,14 @@ public class TestConsistencyGuard {
@Test(expected = TimeoutException.class)
public void testCheckFailingAppears() throws Exception {
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000",
"f1");
- ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 3, 10, 10);
+ ConsistencyGuard passing = new FailSafeConsistencyGuard(fs,
getConsistencyGuardConfig());
passing.waitTillFileAppears(new Path(basePath +
"/partition/path/f1_1-0-2_000.parquet"));
}
@Test(expected = TimeoutException.class)
public void testCheckFailingDisappear() throws Exception {
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000",
"f1");
- ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 3, 10, 10);
+ ConsistencyGuard passing = new FailSafeConsistencyGuard(fs,
getConsistencyGuardConfig());
passing.waitTillAllFilesDisappear(basePath + "/partition/path",
Arrays.asList(basePath + "/partition/path/f1_1-0-1_000.parquet",
basePath + "/partition/path/f2_1-0-2_000.parquet"));
@@ -104,7 +105,17 @@ public class TestConsistencyGuard {
public void testCheckFailingDisappears() throws Exception {
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000",
"f1");
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000",
"f1");
- ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 3, 10, 10);
+ ConsistencyGuard passing = new FailSafeConsistencyGuard(fs,
getConsistencyGuardConfig());
passing.waitTillFileDisappears(new Path(basePath +
"/partition/path/f1_1-0-1_000.parquet"));
}
+
+ private ConsistencyGuardConfig getConsistencyGuardConfig() {
+ return getConsistencyGuardConfig(3, 10, 10);
+ }
+
+ private ConsistencyGuardConfig getConsistencyGuardConfig(int maxChecks, int
initalSleep, int maxSleep) {
+ return
ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true)
+
.withInitialConsistencyCheckIntervalMs(initalSleep).withMaxConsistencyCheckIntervalMs(maxSleep)
+ .withMaxConsistencyChecks(maxChecks).build();
+ }
}
diff --git
a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java
b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java
index a668d3b..7f0f0d6 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java
@@ -26,7 +26,7 @@ import static org.junit.Assert.assertTrue;
import com.uber.hoodie.common.HoodieCleanStat;
import com.uber.hoodie.common.HoodieClientTestUtils;
import com.uber.hoodie.common.HoodieTestDataGenerator;
-import com.uber.hoodie.common.TestRawTripPayload;
+import com.uber.hoodie.common.TestRawTripPayload.MetadataMergeWriteStatus;
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTableType;
@@ -37,12 +37,14 @@ import com.uber.hoodie.common.table.SyncableFileSystemView;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.view.FileSystemViewStorageConfig;
import com.uber.hoodie.common.table.view.FileSystemViewStorageType;
+import com.uber.hoodie.common.util.ConsistencyGuardConfig;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieStorageConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.HoodieIndex;
+import com.uber.hoodie.index.HoodieIndex.IndexType;
import com.uber.hoodie.table.HoodieTable;
import java.io.File;
import java.io.IOException;
@@ -191,12 +193,12 @@ public class TestHoodieClientBase implements Serializable
{
return
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
-
.withWriteStatusClass(TestRawTripPayload.MetadataMergeWriteStatus.class)
- .withConsistencyCheckEnabled(true)
+ .withWriteStatusClass(MetadataMergeWriteStatus.class)
+
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024
* 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024
* 1024).build())
.forTable("test-trip-table")
-
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build())
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(
FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE)
.build());
diff --git
a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
index aa53d9f..ad5fae8 100644
---
a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
@@ -39,6 +39,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
+import com.uber.hoodie.common.util.ConsistencyGuardConfig;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.ParquetUtils;
import com.uber.hoodie.common.util.collection.Pair;
@@ -686,8 +687,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
TestHoodieClientBase {
private Pair<Path, JavaRDD<WriteStatus>>
testConsistencyCheck(HoodieTableMetaClient metaClient, String commitTime)
throws Exception {
- HoodieWriteConfig cfg =
getConfigBuilder().withAutoCommit(false).withMaxConsistencyCheckIntervalMs(1)
- .withInitialConsistencyCheckIntervalMs(1).build();
+ HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false)
+ .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
+ .withConsistencyCheckEnabled(true)
+ .withMaxConsistencyCheckIntervalMs(1)
+ .withInitialConsistencyCheckIntervalMs(1)
+ .build())
+ .build();
HoodieWriteClient client = getHoodieWriteClient(cfg);
client.startCommitWithTime(commitTime);
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/HoodieWrapperFileSystem.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/HoodieWrapperFileSystem.java
index 14e76ac..72e2d57 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/HoodieWrapperFileSystem.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/HoodieWrapperFileSystem.java
@@ -22,6 +22,7 @@ import com.uber.hoodie.common.storage.StorageSchemes;
import com.uber.hoodie.common.util.ConsistencyGuard;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.NoOpConsistencyGuard;
+import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.net.URI;
@@ -236,7 +237,28 @@ public class HoodieWrapperFileSystem extends FileSystem {
@Override
public boolean rename(Path src, Path dst) throws IOException {
- return fileSystem.rename(convertToDefaultPath(src),
convertToDefaultPath(dst));
+ try {
+ consistencyGuard.waitTillFileAppears(convertToDefaultPath(src));
+ } catch (TimeoutException e) {
+ throw new HoodieException("Timed out waiting for " + src + " to appear",
e);
+ }
+
+ boolean success = fileSystem.rename(convertToDefaultPath(src),
convertToDefaultPath(dst));
+
+ if (success) {
+ try {
+ consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst));
+ } catch (TimeoutException e) {
+ throw new HoodieException("Timed out waiting for " + dst + " to
appear", e);
+ }
+
+ try {
+ consistencyGuard.waitTillFileDisappears(convertToDefaultPath(src));
+ } catch (TimeoutException e) {
+ throw new HoodieException("Timed out waiting for " + src + " to
disappear", e);
+ }
+ }
+ return success;
}
@Override
@@ -247,7 +269,7 @@ public class HoodieWrapperFileSystem extends FileSystem {
try {
consistencyGuard.waitTillFileDisappears(f);
} catch (TimeoutException e) {
- return false;
+ throw new HoodieException("Timed out waiting for " + f + " to
disappear", e);
}
}
return success;
@@ -270,7 +292,15 @@ public class HoodieWrapperFileSystem extends FileSystem {
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
- return fileSystem.mkdirs(convertToDefaultPath(f), permission);
+ boolean success = fileSystem.mkdirs(convertToDefaultPath(f), permission);
+ if (success) {
+ try {
+ consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
+ } catch (TimeoutException e) {
+ throw new HoodieException("Timed out waiting for directory " + f + "
to appear", e);
+ }
+ }
+ return success;
}
@Override
@@ -353,31 +383,39 @@ public class HoodieWrapperFileSystem extends FileSystem {
@Override
public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int
bufferSize,
short replication, long blockSize, Progressable progress) throws
IOException {
- return fileSystem
- .createNonRecursive(convertToDefaultPath(f), overwrite, bufferSize,
replication, blockSize,
- progress);
+ Path p = convertToDefaultPath(f);
+ return wrapOutputStream(p, fileSystem.createNonRecursive(p, overwrite,
bufferSize, replication, blockSize,
+ progress));
}
@Override
public FSDataOutputStream createNonRecursive(Path f, FsPermission
permission, boolean overwrite,
int bufferSize, short replication, long blockSize, Progressable
progress) throws IOException {
- return fileSystem
- .createNonRecursive(convertToDefaultPath(f), permission, overwrite,
bufferSize, replication,
- blockSize, progress);
+ Path p = convertToDefaultPath(f);
+ return wrapOutputStream(p, fileSystem.createNonRecursive(p, permission,
overwrite, bufferSize, replication,
+ blockSize, progress));
}
@Override
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
EnumSet<CreateFlag> flags, int bufferSize, short replication, long
blockSize,
Progressable progress) throws IOException {
- return fileSystem
- .createNonRecursive(convertToDefaultPath(f), permission, flags,
bufferSize, replication,
- blockSize, progress);
+ Path p = convertToDefaultPath(f);
+ return wrapOutputStream(p, fileSystem.createNonRecursive(p, permission,
flags, bufferSize, replication,
+ blockSize, progress));
}
@Override
public boolean createNewFile(Path f) throws IOException {
- return fileSystem.createNewFile(convertToDefaultPath(f));
+ boolean newFile = fileSystem.createNewFile(convertToDefaultPath(f));
+ if (newFile) {
+ try {
+ consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
+ } catch (TimeoutException e) {
+ throw new HoodieException("Timed out waiting for " + f + " to appear",
e);
+ }
+ }
+ return newFile;
}
@Override
@@ -394,6 +432,11 @@ public class HoodieWrapperFileSystem extends FileSystem {
public void concat(Path trg, Path[] psrcs) throws IOException {
Path[] psrcsNew = convertDefaults(psrcs);
fileSystem.concat(convertToDefaultPath(trg), psrcsNew);
+ try {
+ consistencyGuard.waitTillFileAppears(convertToDefaultPath(trg));
+ } catch (TimeoutException e) {
+ throw new HoodieException("Timed out waiting for " + trg + " to appear",
e);
+ }
}
@Override
@@ -408,7 +451,7 @@ public class HoodieWrapperFileSystem extends FileSystem {
@Override
public boolean delete(Path f) throws IOException {
- return fileSystem.delete(convertToDefaultPath(f));
+ return delete(f, true);
}
@Override
@@ -493,62 +536,100 @@ public class HoodieWrapperFileSystem extends FileSystem {
@Override
public boolean mkdirs(Path f) throws IOException {
- return fileSystem.mkdirs(convertToDefaultPath(f));
+ boolean success = fileSystem.mkdirs(convertToDefaultPath(f));
+ if (success) {
+ try {
+ consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
+ } catch (TimeoutException e) {
+ throw new HoodieException("Timed out waiting for directory " + f + "
to appear", e);
+ }
+ }
+ return success;
}
@Override
public void copyFromLocalFile(Path src, Path dst) throws IOException {
- fileSystem.copyFromLocalFile(convertToDefaultPath(src),
convertToDefaultPath(dst));
+ fileSystem.copyFromLocalFile(convertToLocalPath(src),
convertToDefaultPath(dst));
+ try {
+ consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst));
+ } catch (TimeoutException e) {
+ throw new HoodieException("Timed out waiting for destination " + dst + "
to appear", e);
+ }
}
@Override
public void moveFromLocalFile(Path[] srcs, Path dst) throws IOException {
- fileSystem.moveFromLocalFile(convertDefaults(srcs),
convertToDefaultPath(dst));
+ fileSystem.moveFromLocalFile(convertLocalPaths(srcs),
convertToDefaultPath(dst));
+ try {
+ consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst));
+ } catch (TimeoutException e) {
+ throw new HoodieException("Timed out waiting for destination " + dst + "
to appear", e);
+ }
}
@Override
public void moveFromLocalFile(Path src, Path dst) throws IOException {
- fileSystem.moveFromLocalFile(convertToDefaultPath(src),
convertToDefaultPath(dst));
+ fileSystem.moveFromLocalFile(convertToLocalPath(src),
convertToDefaultPath(dst));
+ try {
+ consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst));
+ } catch (TimeoutException e) {
+ throw new HoodieException("Timed out waiting for destination " + dst + "
to appear", e);
+ }
}
@Override
public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws
IOException {
- fileSystem.copyFromLocalFile(delSrc, convertToDefaultPath(src),
convertToDefaultPath(dst));
+ fileSystem.copyFromLocalFile(delSrc, convertToLocalPath(src),
convertToDefaultPath(dst));
+ try {
+ consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst));
+ } catch (TimeoutException e) {
+ throw new HoodieException("Timed out waiting for destination " + dst + "
to appear", e);
+ }
}
@Override
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path[]
srcs, Path dst)
throws IOException {
fileSystem
- .copyFromLocalFile(delSrc, overwrite, convertDefaults(srcs),
convertToDefaultPath(dst));
+ .copyFromLocalFile(delSrc, overwrite, convertLocalPaths(srcs),
convertToDefaultPath(dst));
+ try {
+ consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst));
+ } catch (TimeoutException e) {
+ throw new HoodieException("Timed out waiting for destination " + dst + "
to appear", e);
+ }
}
@Override
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
Path dst)
throws IOException {
fileSystem
- .copyFromLocalFile(delSrc, overwrite, convertToDefaultPath(src),
convertToDefaultPath(dst));
+ .copyFromLocalFile(delSrc, overwrite, convertToLocalPath(src),
convertToDefaultPath(dst));
+ try {
+ consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst));
+ } catch (TimeoutException e) {
+ throw new HoodieException("Timed out waiting for destination " + dst + "
to appear", e);
+ }
}
@Override
public void copyToLocalFile(Path src, Path dst) throws IOException {
- fileSystem.copyToLocalFile(convertToDefaultPath(src),
convertToDefaultPath(dst));
+ fileSystem.copyToLocalFile(convertToDefaultPath(src),
convertToLocalPath(dst));
}
@Override
public void moveToLocalFile(Path src, Path dst) throws IOException {
- fileSystem.moveToLocalFile(convertToDefaultPath(src),
convertToDefaultPath(dst));
+ fileSystem.moveToLocalFile(convertToDefaultPath(src),
convertToLocalPath(dst));
}
@Override
public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws
IOException {
- fileSystem.copyToLocalFile(delSrc, convertToDefaultPath(src),
convertToDefaultPath(dst));
+ fileSystem.copyToLocalFile(delSrc, convertToDefaultPath(src),
convertToLocalPath(dst));
}
@Override
public void copyToLocalFile(boolean delSrc, Path src, Path dst, boolean
useRawLocalFileSystem)
throws IOException {
- fileSystem.copyToLocalFile(delSrc, convertToDefaultPath(src),
convertToDefaultPath(dst),
+ fileSystem.copyToLocalFile(delSrc, convertToDefaultPath(src),
convertToLocalPath(dst),
useRawLocalFileSystem);
}
@@ -787,6 +868,22 @@ public class HoodieWrapperFileSystem extends FileSystem {
return convertPathWithScheme(oldPath, fileSystem.getScheme());
}
+ private Path convertToLocalPath(Path oldPath) {
+ try {
+ return convertPathWithScheme(oldPath,
FileSystem.getLocal(getConf()).getScheme());
+ } catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ }
+
+ private Path[] convertLocalPaths(Path[] psrcs) {
+ Path[] psrcsNew = new Path[psrcs.length];
+ for (int i = 0; i < psrcs.length; i++) {
+ psrcsNew[i] = convertToLocalPath(psrcs[i]);
+ }
+ return psrcsNew;
+ }
+
private Path[] convertDefaults(Path[] psrcs) {
Path[] psrcsNew = new Path[psrcs.length];
for (int i = 0; i < psrcs.length; i++) {
@@ -803,4 +900,8 @@ public class HoodieWrapperFileSystem extends FileSystem {
throw new IllegalArgumentException(file.toString()
+ " does not have a open stream. Cannot get the bytes written on the
stream");
}
+
+ public FileSystem getFileSystem() {
+ return fileSystem;
+ }
}
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
index c8dd7d2..7ed0214 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
@@ -20,12 +20,17 @@ package com.uber.hoodie.common.table;
import static com.uber.hoodie.common.model.HoodieTableType.MERGE_ON_READ;
+import com.google.common.base.Preconditions;
import com.uber.hoodie.common.SerializableConfiguration;
+import com.uber.hoodie.common.io.storage.HoodieWrapperFileSystem;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieArchivedTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
+import com.uber.hoodie.common.util.ConsistencyGuardConfig;
import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.FailSafeConsistencyGuard;
+import com.uber.hoodie.common.util.NoOpConsistencyGuard;
import com.uber.hoodie.exception.DatasetNotFoundException;
import com.uber.hoodie.exception.HoodieException;
import java.io.File;
@@ -66,13 +71,14 @@ public class HoodieTableMetaClient implements Serializable {
public static final String MARKER_EXTN = ".marker";
private String basePath;
- private transient FileSystem fs;
+ private transient HoodieWrapperFileSystem fs;
private String metaPath;
private SerializableConfiguration hadoopConf;
private HoodieTableType tableType;
private HoodieTableConfig tableConfig;
private HoodieActiveTimeline activeTimeline;
private HoodieArchivedTimeline archivedTimeline;
+ private ConsistencyGuardConfig consistencyGuardConfig =
ConsistencyGuardConfig.newBuilder().build();
public HoodieTableMetaClient(Configuration conf, String basePath)
throws DatasetNotFoundException {
@@ -81,13 +87,19 @@ public class HoodieTableMetaClient implements Serializable {
}
public HoodieTableMetaClient(Configuration conf, String basePath,
- boolean loadActiveTimelineOnLoad)
+ boolean loadActiveTimelineOnLoad) {
+ this(conf, basePath, loadActiveTimelineOnLoad,
ConsistencyGuardConfig.newBuilder().build());
+ }
+
+ public HoodieTableMetaClient(Configuration conf, String basePath,
+ boolean loadActiveTimelineOnLoad, ConsistencyGuardConfig
consistencyGuardConfig)
throws DatasetNotFoundException {
log.info("Loading HoodieTableMetaClient from " + basePath);
this.basePath = basePath;
+ this.consistencyGuardConfig = consistencyGuardConfig;
this.hadoopConf = new SerializableConfiguration(conf);
Path basePathDir = new Path(this.basePath);
- this.metaPath = basePath + File.separator + METAFOLDER_NAME;
+ this.metaPath = new Path(basePath, METAFOLDER_NAME).toString();
Path metaPathDir = new Path(this.metaPath);
this.fs = getFs();
DatasetNotFoundException.checkValidDataset(fs, basePathDir, metaPathDir);
@@ -190,13 +202,25 @@ public class HoodieTableMetaClient implements
Serializable {
/**
* Get the FS implementation for this table
*/
- public FileSystem getFs() {
+ public HoodieWrapperFileSystem getFs() {
if (fs == null) {
- fs = FSUtils.getFs(metaPath, hadoopConf.get());
+ FileSystem fileSystem = FSUtils.getFs(metaPath, hadoopConf.get());
+ Preconditions.checkArgument(!(fileSystem instanceof
HoodieWrapperFileSystem),
+ "File System not expected to be that of HoodieWrapperFileSystem");
+ fs = new HoodieWrapperFileSystem(fileSystem,
consistencyGuardConfig.isConsistencyCheckEnabled()
+ ? new FailSafeConsistencyGuard(fileSystem, consistencyGuardConfig)
: new NoOpConsistencyGuard());
}
return fs;
}
+ /**
+ * Return raw file-system
+ * @return
+ */
+ public FileSystem getRawFs() {
+ return getFs().getFileSystem();
+ }
+
public Configuration getHadoopConf() {
return hadoopConf.get();
}
@@ -223,6 +247,10 @@ public class HoodieTableMetaClient implements Serializable
{
return activeTimeline;
}
+ public ConsistencyGuardConfig getConsistencyGuardConfig() {
+ return consistencyGuardConfig;
+ }
+
/**
* Get the archived commits as a timeline. This is costly operation, as all
data from the archived
* files are read. This should not be used, unless for historical debugging
purposes
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ConsistencyGuardConfig.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ConsistencyGuardConfig.java
new file mode 100644
index 0000000..1418191
--- /dev/null
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ConsistencyGuardConfig.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.util;
+
+import com.uber.hoodie.config.DefaultHoodieConfig;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+public class ConsistencyGuardConfig extends DefaultHoodieConfig {
+
+ private static final String CONSISTENCY_CHECK_ENABLED_PROP =
"hoodie.consistency.check.enabled";
+ private static final String DEFAULT_CONSISTENCY_CHECK_ENABLED = "false";
+
+ // time between successive attempts to ensure written data's metadata is
consistent on storage
+ private static final String INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP =
+ "hoodie.consistency.check.initial_interval_ms";
+ private static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 2000L;
+
+ // max interval time
+ private static final String MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP =
"hoodie.consistency.check.max_interval_ms";
+ private static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = 300000L;
+
+ // maximum number of checks, for consistency of written data. Will wait upto
256 Secs
+ private static final String MAX_CONSISTENCY_CHECKS_PROP =
"hoodie.consistency.check.max_checks";
+ private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
+
+ public ConsistencyGuardConfig(Properties props) {
+ super(props);
+ }
+
+ public static ConsistencyGuardConfig.Builder newBuilder() {
+ return new Builder();
+ }
+
+ public boolean isConsistencyCheckEnabled() {
+ return
Boolean.parseBoolean(props.getProperty(CONSISTENCY_CHECK_ENABLED_PROP));
+ }
+
+ public int getMaxConsistencyChecks() {
+ return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECKS_PROP));
+ }
+
+ public int getInitialConsistencyCheckIntervalMs() {
+ return
Integer.parseInt(props.getProperty(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP));
+ }
+
+ public int getMaxConsistencyCheckIntervalMs() {
+ return
Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP));
+ }
+
+ public static class Builder {
+
+ private final Properties props = new Properties();
+
+ public Builder fromFile(File propertiesFile) throws IOException {
+ FileReader reader = new FileReader(propertiesFile);
+ try {
+ props.load(reader);
+ return this;
+ } finally {
+ reader.close();
+ }
+ }
+
+ public Builder fromProperties(Properties props) {
+ this.props.putAll(props);
+ return this;
+ }
+
+ public Builder withConsistencyCheckEnabled(boolean enabled) {
+ props.setProperty(CONSISTENCY_CHECK_ENABLED_PROP,
String.valueOf(enabled));
+ return this;
+ }
+
+ public Builder withInitialConsistencyCheckIntervalMs(int initialIntevalMs)
{
+ props.setProperty(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP,
String.valueOf(initialIntevalMs));
+ return this;
+ }
+
+ public Builder withMaxConsistencyCheckIntervalMs(int maxIntervalMs) {
+ props.setProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP,
String.valueOf(maxIntervalMs));
+ return this;
+ }
+
+ public Builder withMaxConsistencyChecks(int maxConsistencyChecks) {
+ props.setProperty(MAX_CONSISTENCY_CHECKS_PROP,
String.valueOf(maxConsistencyChecks));
+ return this;
+ }
+
+ public ConsistencyGuardConfig build() {
+ setDefaultOnCondition(props,
!props.containsKey(CONSISTENCY_CHECK_ENABLED_PROP),
+ CONSISTENCY_CHECK_ENABLED_PROP, DEFAULT_CONSISTENCY_CHECK_ENABLED);
+ setDefaultOnCondition(props,
!props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
+ INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP,
String.valueOf(DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS));
+ setDefaultOnCondition(props,
!props.containsKey(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
+ MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP,
String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS));
+ setDefaultOnCondition(props,
!props.containsKey(MAX_CONSISTENCY_CHECKS_PROP),
+ MAX_CONSISTENCY_CHECKS_PROP,
String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECKS));
+
+ return new ConsistencyGuardConfig(props);
+ }
+ }
+}
diff --git
a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FailSafeConsistencyGuard.java
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FailSafeConsistencyGuard.java
index f69fcef..ee215be 100644
---
a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FailSafeConsistencyGuard.java
+++
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FailSafeConsistencyGuard.java
@@ -18,6 +18,7 @@
package com.uber.hoodie.common.util;
+import com.google.common.base.Preconditions;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
@@ -40,15 +41,12 @@ public class FailSafeConsistencyGuard implements
ConsistencyGuard {
private static final transient Logger log =
LogManager.getLogger(FailSafeConsistencyGuard.class);
private final FileSystem fs;
- private final int maxAttempts;
- private final long initialDelayMs;
- private final long maxDelayMs;
+ private final ConsistencyGuardConfig consistencyGuardConfig;
- public FailSafeConsistencyGuard(FileSystem fs, int maxAttempts, long
initalDelayMs, long maxDelayMs) {
+ public FailSafeConsistencyGuard(FileSystem fs, ConsistencyGuardConfig
consistencyGuardConfig) {
this.fs = fs;
- this.maxAttempts = maxAttempts;
- this.initialDelayMs = initalDelayMs;
- this.maxDelayMs = maxDelayMs;
+ this.consistencyGuardConfig = consistencyGuardConfig;
+
Preconditions.checkArgument(consistencyGuardConfig.isConsistencyCheckEnabled());
}
@Override
@@ -121,13 +119,13 @@ public class FailSafeConsistencyGuard implements
ConsistencyGuard {
*/
private boolean checkFileVisibility(Path filePath, FileVisibility
visibility) throws IOException {
try {
- FileStatus[] status = fs.listStatus(filePath);
+ FileStatus status = fs.getFileStatus(filePath);
switch (visibility) {
case APPEAR:
- return status.length != 0;
+ return status != null;
case DISAPPEAR:
default:
- return status.length == 0;
+ return status == null;
}
} catch (FileNotFoundException nfe) {
switch (visibility) {
@@ -147,9 +145,9 @@ public class FailSafeConsistencyGuard implements
ConsistencyGuard {
* @throws TimeoutException
*/
private void waitForFileVisibility(Path filePath, FileVisibility visibility)
throws TimeoutException {
- long waitMs = initialDelayMs;
+ long waitMs =
consistencyGuardConfig.getInitialConsistencyCheckIntervalMs();
int attempt = 0;
- while (attempt < maxAttempts) {
+ while (attempt < consistencyGuardConfig.getMaxConsistencyChecks()) {
try {
if (checkFileVisibility(filePath, visibility)) {
return;
@@ -160,7 +158,7 @@ public class FailSafeConsistencyGuard implements
ConsistencyGuard {
sleepSafe(waitMs);
waitMs = waitMs * 2; // double check interval every attempt
- waitMs = waitMs > maxDelayMs ? maxDelayMs : waitMs;
+ waitMs = Math.min(waitMs,
consistencyGuardConfig.getMaxConsistencyCheckIntervalMs());
attempt++;
}
throw new TimeoutException("Timed-out waiting for the file to " +
visibility.name());
@@ -173,17 +171,17 @@ public class FailSafeConsistencyGuard implements
ConsistencyGuard {
* @throws TimeoutException when retries are exhausted
*/
private void retryTillSuccess(Function<Integer, Boolean> predicate, String
timedOutMessage) throws TimeoutException {
- long waitMs = initialDelayMs;
+ long waitMs =
consistencyGuardConfig.getInitialConsistencyCheckIntervalMs();
int attempt = 0;
- log.warn("Max Attempts=" + maxAttempts);
- while (attempt < maxAttempts) {
+ log.info("Max Attempts=" +
consistencyGuardConfig.getMaxConsistencyChecks());
+ while (attempt < consistencyGuardConfig.getMaxConsistencyChecks()) {
boolean success = predicate.apply(attempt);
if (success) {
return;
}
sleepSafe(waitMs);
waitMs = waitMs * 2; // double check interval every attempt
- waitMs = waitMs > maxDelayMs ? maxDelayMs : waitMs;
+ waitMs = Math.min(waitMs,
consistencyGuardConfig.getMaxConsistencyCheckIntervalMs());
attempt++;
}
throw new TimeoutException(timedOutMessage);