vinothchandar commented on a change in pull request #692: HUDI-70 : Making
DeltaStreamer run in continuous mode with concurrent compaction
URL: https://github.com/apache/incubator-hudi/pull/692#discussion_r292162147
##########
File path:
hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -182,190 +86,30 @@ private static HiveConf getDefaultHiveConf(Configuration
cfg) {
}
public void sync() throws Exception {
- HoodieDeltaStreamerMetrics metrics = new
HoodieDeltaStreamerMetrics(getHoodieClientConfig(null));
- Timer.Context overallTimerContext = metrics.getOverallTimerContext();
- // Retrieve the previous round checkpoints, if any
- Optional<String> resumeCheckpointStr = Optional.empty();
- if (commitTimelineOpt.isPresent()) {
- Optional<HoodieInstant> lastCommit =
commitTimelineOpt.get().lastInstant();
- if (lastCommit.isPresent()) {
- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
- commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(),
HoodieCommitMetadata.class);
- if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) {
- resumeCheckpointStr =
Optional.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
- } else {
- throw new HoodieDeltaStreamerException(
- "Unable to find previous checkpoint. Please double check if this
table "
- + "was indeed built via delta streamer ");
- }
- }
- } else {
- HoodieTableMetaClient.initTableType(jssc.hadoopConfiguration(),
cfg.targetBasePath,
- cfg.storageType, cfg.targetTableName, "archived");
- }
- log.info("Checkpoint to resume from : " + resumeCheckpointStr);
-
- final Optional<JavaRDD<GenericRecord>> avroRDDOptional;
- final String checkpointStr;
- final SchemaProvider schemaProvider;
- if (transformer != null) {
- // Transformation is needed. Fetch New rows in Row Format, apply
transformation and then convert them
- // to generic records for writing
- InputBatch<Dataset<Row>> dataAndCheckpoint =
formatAdapter.fetchNewDataInRowFormat(
- resumeCheckpointStr, cfg.sourceLimit);
-
- Optional<Dataset<Row>> transformed =
- dataAndCheckpoint.getBatch().map(data -> transformer.apply(jssc,
sparkSession, data, props));
- checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
- avroRDDOptional = transformed.map(t ->
- AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME,
HOODIE_RECORD_NAMESPACE).toJavaRDD()
- );
- // Use Transformed Row's schema if not overridden
- schemaProvider =
- this.schemaProvider == null ? transformed.map(r ->
(SchemaProvider)new RowBasedSchemaProvider(r.schema()))
- .orElse(dataAndCheckpoint.getSchemaProvider()) :
this.schemaProvider;
- } else {
- // Pull the data from the source & prepare the write
- InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
- formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr,
cfg.sourceLimit);
- avroRDDOptional = dataAndCheckpoint.getBatch();
- checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
- schemaProvider = dataAndCheckpoint.getSchemaProvider();
- }
-
- if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) {
- log.info("No new data, nothing to commit.. ");
- return;
- }
-
- registerAvroSchemas(schemaProvider);
-
- JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
- JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
- HoodieRecordPayload payload =
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
- (Comparable) DataSourceUtils.getNestedFieldVal(gr,
cfg.sourceOrderingField));
- return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
- });
-
- // filter dupes if needed
- HoodieWriteConfig hoodieCfg = getHoodieClientConfig(schemaProvider);
- if (cfg.filterDupes) {
- // turn upserts to insert
- cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT :
cfg.operation;
- records = DataSourceUtils.dropDuplicates(jssc, records, hoodieCfg);
-
- if (records.isEmpty()) {
- log.info("No new data, nothing to commit.. ");
- return;
- }
- }
-
- // Perform the write
- HoodieWriteClient client = new HoodieWriteClient<>(jssc, hoodieCfg, true);
- String commitTime = client.startCommit();
- log.info("Starting commit : " + commitTime);
-
- JavaRDD<WriteStatus> writeStatusRDD;
- if (cfg.operation == Operation.INSERT) {
- writeStatusRDD = client.insert(records, commitTime);
- } else if (cfg.operation == Operation.UPSERT) {
- writeStatusRDD = client.upsert(records, commitTime);
- } else if (cfg.operation == Operation.BULK_INSERT) {
- writeStatusRDD = client.bulkInsert(records, commitTime);
- } else {
- throw new HoodieDeltaStreamerException("Unknown operation :" +
cfg.operation);
- }
-
- long totalErrorRecords = writeStatusRDD.mapToDouble(ws ->
ws.getTotalErrorRecords()).sum().longValue();
- long totalRecords = writeStatusRDD.mapToDouble(ws ->
ws.getTotalRecords()).sum().longValue();
- boolean hasErrors = totalErrorRecords > 0;
- long hiveSyncTimeMs = 0;
- if (!hasErrors || cfg.commitOnErrors) {
- HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
- checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
-
- if (hasErrors) {
- log.warn("Some records failed to be merged but forcing commit since
commitOnErrors set. Errors/Total="
- + totalErrorRecords + "/" + totalRecords);
- }
-
- boolean success = client.commit(commitTime, writeStatusRDD,
- Optional.of(checkpointCommitMetadata));
- if (success) {
- log.info("Commit " + commitTime + " successful!");
- // Sync to hive if enabled
- Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext();
- syncHive();
- hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0;
- } else {
- log.info("Commit " + commitTime + " failed!");
- }
+ if (cfg.continuousMode) {
+ deltaSync.start(this::onDataSyncShutdown);
Review comment:
onDeltaSyncShutdown?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services