vinothchandar commented on a change in pull request #692: HUDI-70 : Making 
DeltaStreamer run in continuous mode with concurrent compaction
URL: https://github.com/apache/incubator-hudi/pull/692#discussion_r292162290
 
 

 ##########
 File path: 
hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
 ##########
 @@ -182,190 +86,30 @@ private static HiveConf getDefaultHiveConf(Configuration 
cfg) {
   }
 
   public void sync() throws Exception {
-    HoodieDeltaStreamerMetrics metrics = new 
HoodieDeltaStreamerMetrics(getHoodieClientConfig(null));
-    Timer.Context overallTimerContext = metrics.getOverallTimerContext();
-    // Retrieve the previous round checkpoints, if any
-    Optional<String> resumeCheckpointStr = Optional.empty();
-    if (commitTimelineOpt.isPresent()) {
-      Optional<HoodieInstant> lastCommit = 
commitTimelineOpt.get().lastInstant();
-      if (lastCommit.isPresent()) {
-        HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
-            commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), 
HoodieCommitMetadata.class);
-        if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) {
-          resumeCheckpointStr = 
Optional.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
-        } else {
-          throw new HoodieDeltaStreamerException(
-              "Unable to find previous checkpoint. Please double check if this 
table "
-                  + "was indeed built via delta streamer ");
-        }
-      }
-    } else {
-      HoodieTableMetaClient.initTableType(jssc.hadoopConfiguration(), 
cfg.targetBasePath,
-          cfg.storageType, cfg.targetTableName, "archived");
-    }
-    log.info("Checkpoint to resume from : " + resumeCheckpointStr);
-
-    final Optional<JavaRDD<GenericRecord>> avroRDDOptional;
-    final String checkpointStr;
-    final SchemaProvider schemaProvider;
-    if (transformer != null) {
-      // Transformation is needed. Fetch New rows in Row Format, apply 
transformation and then convert them
-      // to generic records for writing
-      InputBatch<Dataset<Row>> dataAndCheckpoint = 
formatAdapter.fetchNewDataInRowFormat(
-          resumeCheckpointStr, cfg.sourceLimit);
-
-      Optional<Dataset<Row>> transformed =
-          dataAndCheckpoint.getBatch().map(data -> transformer.apply(jssc, 
sparkSession, data, props));
-      checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
-      avroRDDOptional = transformed.map(t ->
-         AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME, 
HOODIE_RECORD_NAMESPACE).toJavaRDD()
-      );
-      // Use Transformed Row's schema if not overridden
-      schemaProvider =
-          this.schemaProvider == null ? transformed.map(r -> 
(SchemaProvider)new RowBasedSchemaProvider(r.schema()))
-              .orElse(dataAndCheckpoint.getSchemaProvider()) : 
this.schemaProvider;
-    } else {
-      // Pull the data from the source & prepare the write
-      InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
-          formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr, 
cfg.sourceLimit);
-      avroRDDOptional = dataAndCheckpoint.getBatch();
-      checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
-      schemaProvider = dataAndCheckpoint.getSchemaProvider();
-    }
-
-    if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) {
-      log.info("No new data, nothing to commit.. ");
-      return;
-    }
-
-    registerAvroSchemas(schemaProvider);
-
-    JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
-    JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
-      HoodieRecordPayload payload = 
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
-          (Comparable) DataSourceUtils.getNestedFieldVal(gr, 
cfg.sourceOrderingField));
-      return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
-    });
-
-    // filter dupes if needed
-    HoodieWriteConfig hoodieCfg = getHoodieClientConfig(schemaProvider);
-    if (cfg.filterDupes) {
-      // turn upserts to insert
-      cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : 
cfg.operation;
-      records = DataSourceUtils.dropDuplicates(jssc, records, hoodieCfg);
-
-      if (records.isEmpty()) {
-        log.info("No new data, nothing to commit.. ");
-        return;
-      }
-    }
-
-    // Perform the write
-    HoodieWriteClient client = new HoodieWriteClient<>(jssc, hoodieCfg, true);
-    String commitTime = client.startCommit();
-    log.info("Starting commit  : " + commitTime);
-
-    JavaRDD<WriteStatus> writeStatusRDD;
-    if (cfg.operation == Operation.INSERT) {
-      writeStatusRDD = client.insert(records, commitTime);
-    } else if (cfg.operation == Operation.UPSERT) {
-      writeStatusRDD = client.upsert(records, commitTime);
-    } else if (cfg.operation == Operation.BULK_INSERT) {
-      writeStatusRDD = client.bulkInsert(records, commitTime);
-    } else {
-      throw new HoodieDeltaStreamerException("Unknown operation :" + 
cfg.operation);
-    }
-
-    long totalErrorRecords = writeStatusRDD.mapToDouble(ws -> 
ws.getTotalErrorRecords()).sum().longValue();
-    long totalRecords = writeStatusRDD.mapToDouble(ws -> 
ws.getTotalRecords()).sum().longValue();
-    boolean hasErrors = totalErrorRecords > 0;
-    long hiveSyncTimeMs = 0;
-    if (!hasErrors || cfg.commitOnErrors) {
-      HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
-      checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
-
-      if (hasErrors) {
-        log.warn("Some records failed to be merged but forcing commit since 
commitOnErrors set. Errors/Total="
-            + totalErrorRecords + "/" + totalRecords);
-      }
-
-      boolean success = client.commit(commitTime, writeStatusRDD,
-          Optional.of(checkpointCommitMetadata));
-      if (success) {
-        log.info("Commit " + commitTime + " successful!");
-        // Sync to hive if enabled
-        Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext();
-        syncHive();
-        hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0;
-      } else {
-        log.info("Commit " + commitTime + " failed!");
-      }
+    if (cfg.continuousMode) {
+      deltaSync.start(this::onDataSyncShutdown);
+      deltaSync.waitForShutdown();
+      log.info("Delta Sync shutting down");
     } else {
-      log.error("There are errors when ingesting records. Errors/Total="
-          + totalErrorRecords + "/" + totalRecords);
-      log.error("Printing out the top 100 errors");
-      writeStatusRDD.filter(ws -> ws.hasErrors()).take(100).forEach(ws -> {
-        log.error("Global error :", ws.getGlobalError());
-        if (ws.getErrors().size() > 0) {
-          ws.getErrors().entrySet().forEach(r ->
-              log.trace("Error for key:" + r.getKey() + " is " + 
r.getValue()));
-        }
-      });
-    }
-    client.close();
-    long overallTimeMs = overallTimerContext != null ? 
overallTimerContext.stop() : 0;
-
-    // Send DeltaStreamer Metrics
-    metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs);
-  }
-
-  public void syncHive() {
-    if (cfg.enableHiveSync) {
-      HiveSyncConfig hiveSyncConfig = 
DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath);
-      log.info("Syncing target hoodie table with hive table(" + 
hiveSyncConfig.tableName
-          + "). Hive metastore URL :" + hiveSyncConfig.jdbcUrl + ", basePath 
:" + cfg.targetBasePath);
-
-      new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable();
+      log.info("Delta Streamer running only single round");
+      deltaSync.syncOneRound();
+      deltaSync.close();
+      log.info("Shut down deltastreamer");
     }
   }
 
-  /**
-   * Register Avro Schemas
-   * @param schemaProvider Schema Provider
-   */
-  private void registerAvroSchemas(SchemaProvider schemaProvider) {
-    // register the schemas, so that shuffle does not serialize the full 
schemas
-    if (null != schemaProvider) {
-      List<Schema> schemas = Arrays.asList(schemaProvider.getSourceSchema(), 
schemaProvider.getTargetSchema());
-      log.info("Registering Schema :" + schemas);
-      
jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
-    }
-  }
-
-  private HoodieWriteConfig getHoodieClientConfig(SchemaProvider 
schemaProvider) {
-    HoodieWriteConfig.Builder builder =
-        HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath)
-            .withAutoCommit(false).combineInput(cfg.filterDupes, true)
-            .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-                .withPayloadClass(cfg.payloadClassName)
-                // turn on inline compaction by default, for MOR tables
-                .withInlineCompaction(HoodieTableType.valueOf(cfg.storageType) 
== HoodieTableType.MERGE_ON_READ)
-                .build())
-            .forTable(cfg.targetTableName)
-            
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
-            .withProps(props);
-    if (null != schemaProvider) {
-      builder = 
builder.withSchema(schemaProvider.getTargetSchema().toString());
-    }
-
-    return builder.build();
+  private boolean onDataSyncShutdown(boolean error) {
+    log.info("DataSync shutdown. Closing write client. Error?" + error);
 
 Review comment:
   DeltaSync? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to