vinothchandar commented on a change in pull request #692: HUDI-70 : Making DeltaStreamer run in continuous mode with concurrent compaction URL: https://github.com/apache/incubator-hudi/pull/692#discussion_r292157009
########## File path: hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java ########## @@ -0,0 +1,552 @@ +/* + * Copyright (c) 2019 Uber Technologies, Inc. ([email protected]) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.uber.hoodie.utilities.deltastreamer; + +import static com.uber.hoodie.common.table.HoodieTimeline.COMPACTION_ACTION; +import static com.uber.hoodie.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE; +import static com.uber.hoodie.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME; + +import com.codahale.metrics.Timer; +import com.uber.hoodie.AvroConversionUtils; +import com.uber.hoodie.DataSourceUtils; +import com.uber.hoodie.HoodieWriteClient; +import com.uber.hoodie.KeyGenerator; +import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.model.HoodieCommitMetadata; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.table.timeline.HoodieInstant.State; +import com.uber.hoodie.common.util.CompactionUtils; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.common.util.collection.Pair; +import com.uber.hoodie.config.HoodieCompactionConfig; +import com.uber.hoodie.config.HoodieIndexConfig; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.hive.HiveSyncConfig; +import com.uber.hoodie.hive.HiveSyncTool; +import com.uber.hoodie.index.HoodieIndex; +import com.uber.hoodie.utilities.UtilHelpers; +import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.Operation; +import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException; +import com.uber.hoodie.utilities.schema.RowBasedSchemaProvider; +import com.uber.hoodie.utilities.schema.SchemaProvider; +import com.uber.hoodie.utilities.sources.InputBatch; +import com.uber.hoodie.utilities.transform.Transformer; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import scala.collection.JavaConversions; + +/** + * Ingests data either as a single-run or in continuous mode. + */ +public class DeltaSync extends AbstractBaseService { + + public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key"; + + /** + * Delta Sync Config + */ + private final HoodieDeltaStreamer.Config cfg; + + /** + * Source to pull deltas from + */ + private transient SourceFormatAdapter formatAdapter; + + /** + * Schema provider that supplies the command for reading the input and writing out the target table. + */ + private transient SchemaProvider schemaProvider; + + /** + * Allows transforming source to target dataset before writing + */ + private transient Transformer transformer; + + /** + * Extract the key for the target dataset + */ + private KeyGenerator keyGenerator; + + /** + * Filesystem used + */ + private transient FileSystem fs; + + /** + * Timeline with completed commits + */ + private transient Optional<HoodieTimeline> commitTimelineOpt; + + /** + * Spark context + */ + private transient JavaSparkContext jssc; + + /** + * Spark Session + */ + private transient SparkSession sparkSession; + + /** + * Hive Config + */ + private transient HiveConf hiveConf; + + /** + * Bag of properties with source, hoodie client, key generator etc. + */ + TypedProperties props; + + private HoodieWriteClient writeClient; + private AsyncCompactor compactor; + private final HoodieTableType tableType; + + public DeltaSync(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf) + throws IOException { + this.cfg = cfg; + this.jssc = jssc; + this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate(); + this.fs = FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()); + + if (fs.exists(new Path(cfg.targetBasePath))) { + HoodieTableMetaClient meta = new HoodieTableMetaClient( + new Configuration(fs.getConf()), cfg.targetBasePath, false); + tableType = meta.getTableType(); + } else { + tableType = HoodieTableType.valueOf(cfg.storageType); + } + + refreshTimeline(); + this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); + log.info("Creating delta streamer with configs : " + props.toString()); + this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc); + this.transformer = UtilHelpers.createTransformer(cfg.transformerClassName); + this.keyGenerator = DataSourceUtils.createKeyGenerator(cfg.keyGeneratorClass, props); + + this.formatAdapter = + new SourceFormatAdapter(UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, + schemaProvider)); + + this.hiveConf = hiveConf; + if (cfg.filterDupes) { + cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation; + } + // If schemaRegistry already resolved, setup write-client and start compaction + setupWriteClientAndStartCompaction(); + } + + /** + * After every round, timeline needs to be updated + * @throws IOException + */ + private void refreshTimeline() throws IOException { + if (fs.exists(new Path(cfg.targetBasePath))) { + HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath); + this.commitTimelineOpt = Optional.of(meta.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants()); + } else { + this.commitTimelineOpt = Optional.empty(); + HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, + cfg.storageType, cfg.targetTableName, "archived"); + } + } + + @Override + protected Pair<CompletableFuture, ExecutorService> startService() { + ExecutorService executor = Executors.newFixedThreadPool(1); + return Pair.of(CompletableFuture.supplyAsync(() -> { + boolean error = false; + if (cfg.continuousMode && tableType.equals(HoodieTableType.MERGE_ON_READ)) { + // set Scheduler Pool. + log.info("Setting Spark Pool name for ingestion to " + SchedulerConfGenerator.INGEST_POOL_NAME); + jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.INGEST_POOL_NAME); + } + try { + while (!isShutdownRequested()) { + try { + syncOneRound(); + } catch (Exception e) { + log.error("Shutting down delta-sync due to exception", e); + error = true; + throw new HoodieException(e.getMessage(), e); + } + } + } finally { + shutdownCompactor(error); + } + return true; + }, executor), executor); + } + + /** + * Shutdown compactor as DeltaSync is shutdown + * @param error + */ + private void shutdownCompactor(boolean error) { + log.info("Delta Sync shutdown. Error ?" + error); + if (compactor != null) { + log.warn("Gracefully shutting down compactor"); + compactor.shutdown(false); + } + } + + /** + * Run one round of ingestion and return + * @throws Exception + */ + public void syncOneRound() throws Exception { Review comment: rename: syncOnce ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
