codope commented on code in PR #6665:
URL: https://github.com/apache/hudi/pull/6665#discussion_r981446382
##########
hudi-utilities/pom.xml:
##########
@@ -443,6 +443,23 @@
<artifactId>aws-java-sdk-sqs</artifactId>
<version>${aws.sdk.version}</version>
</dependency>
+
+ <!-- start: GCS Incremental Ingestion -->
+ <dependency>
+ <groupId>com.google.cloud</groupId>
+ <artifactId>google-cloud-pubsub</artifactId>
+ <version>1.120.0</version>
Review Comment:
we can declare the version at the top just how we do for other
dependencies. take a look at `aws.sdk.version`
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.gcs.PubsubMessagesFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.MessageBatch;
+import org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity;
+import org.apache.hudi.utilities.sources.helpers.gcs.MetadataMsg;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import static
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ACK_MESSAGES;
+import static
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ACK_MESSAGES_DEFAULT_VALUE;
+import static
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.BATCH_SIZE_CONF;
+import static
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_BATCH_SIZE;
+import static
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.GOOGLE_PROJECT_ID;
+import static
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.PUBSUB_SUBSCRIPTION_ID;
+import static
org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.DO_SKIP;
+
+/*
+ * An incremental source to fetch from a Google Cloud Pubsub topic (a
subscription, to be precise),
+ * and download them into a Hudi table. The messages are assumed to be of type
Cloud Storage Pubsub Notification.
+ *
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options
if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and
Mysql is used for Hive Metastore).
+
+
absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
+
+This class can be invoked via spark-submit as follows. There's a bunch of
optional hive sync flags at the end:
+$ bin/spark-submit \
+--driver-memory 4g \
+--executor-memory 4g \
+--packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
+absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+--source-class org.apache.hudi.utilities.sources.GcsEventsSource \
+--op INSERT \
+--hoodie-conf hoodie.datasource.write.recordkey.field="id" \
+--source-ordering-field timeCreated \
+--hoodie-conf hoodie.index.type=GLOBAL_BLOOM \
+--filter-dupes \
+--allow-commit-on-no-checkpoint-change \
+--hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+--hoodie-conf hoodie.combine.before.insert=true \
+--hoodie-conf
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
\
+--hoodie-conf hoodie.datasource.write.partitionpath.field=bucket \
+--hoodie-conf hoodie.deltastreamer.source.gcs.project.id=infra-dev-358110 \
+--hoodie-conf hoodie.deltastreamer.source.gcs.subscription.id=gcs-obj-8-sub-1 \
+--hoodie-conf hoodie.deltastreamer.source.gcs.ack=true \
+--table-type COPY_ON_WRITE \
+--target-base-path file:\/\/\/absolute_path_to/meta-gcs \
+--target-table gcs_meta \
+--continuous \
+--source-limit 100 \
+--min-sync-interval-seconds 100 \
+--enable-hive-sync \
Review Comment:
This can goto a blog post. It's ok to keep it commented in the code for
reference but make sure to keep it updated as and when something changes.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import static com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create;
+import java.io.IOException;
+import java.util.List;
+import static
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_MAX_INBOUND_MESSAGE_SIZE;
+
+/**
+ * Fetch messages from a specified Google Cloud Pubsub subscription.
+ */
+public class PubsubMessagesFetcher {
Review Comment:
Sure, let's track it as a follow up.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
+import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK;
+import static
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ENABLE_EXISTS_CHECK;
+import static
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DATAFILE_FORMAT;
+
+/**
+ * An incremental source that detects new data in a source table containing
metadata about GCS files,
+ * downloads the actual content of these files from GCS and stores them as
records into a destination table.
+ * <p>
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options
if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and
Mysql is used for Hive Metastore).
+
+
absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
Review Comment:
Ok let's make sure that these are documented in hudi docs on the website as
well.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MessageBatch.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import java.util.List;
+
+/**
+ * A batch of messages fetched from Google Cloud Pubsub within the metadata
fetcher of
+ * Incremental GCS ingestion module.
+ */
+public class MessageBatch {
Review Comment:
I suggest to use this in S3 as well. But it can be taken up as a follow up
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
+import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK;
+import static
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ENABLE_EXISTS_CHECK;
+import static
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DATAFILE_FORMAT;
+
+/**
+ * An incremental source that detects new data in a source table containing
metadata about GCS files,
+ * downloads the actual content of these files from GCS and stores them as
records into a destination table.
+ * <p>
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options
if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and
Mysql is used for Hive Metastore).
+
+
absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
+
+ This class can be invoked via spark-submit as follows. There's a bunch of
optional hive sync flags at the end.
+ $ bin/spark-submit \
+ --packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+ --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \
+ --driver-memory 4g \
+ --executor-memory 4g \
+ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
+ absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+ --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \
+ --op INSERT \
+ --hoodie-conf hoodie.deltastreamer.source.hoodieincr.file.format="parquet" \
+ --hoodie-conf
hoodie.deltastreamer.source.gcsincr.select.file.extension="jsonl" \
+ --hoodie-conf hoodie.deltastreamer.source.gcsincr.datafile.format="json" \
+ --hoodie-conf
hoodie.deltastreamer.source.gcsincr.select.relpath.prefix="country" \
+ --hoodie-conf
hoodie.deltastreamer.source.gcsincr.ignore.relpath.prefix="blah" \
+ --hoodie-conf
hoodie.deltastreamer.source.gcsincr.ignore.relpath.substring="blah" \
+ --hoodie-conf hoodie.datasource.write.recordkey.field=id \
+ --hoodie-conf hoodie.datasource.write.partitionpath.field= \
+ --hoodie-conf
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
\
+ --filter-dupes \
+ --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+ --hoodie-conf hoodie.combine.before.insert=true \
+ --source-ordering-field id \
+ --table-type COPY_ON_WRITE \
+ --target-base-path file:\/\/\/absolute_path_to/data-gcs \
+ --target-table gcs_data \
+ --continuous \
+ --source-limit 100 \
+ --min-sync-interval-seconds 60 \
+ --hoodie-conf
hoodie.deltastreamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs
\
+ --hoodie-conf
hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT
\
+ --enable-hive-sync \
+ --hoodie-conf hoodie.datasource.hive_sync.database=default \
+ --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data \
+ */
+public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
+
+ private final String srcPath;
+ private final boolean checkIfFileExists;
+ private final int numInstantsPerFetch;
+
+ private final MissingCheckpointStrategy missingCheckpointStrategy;
+ private final FilePathsFetcher filePathsFetcher;
+ private final FileDataFetcher fileDataFetcher;
+
+ private static final Logger LOG =
LogManager.getLogger(GcsEventsHoodieIncrSource.class);
+
+ public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext
jsc, SparkSession spark,
+ SchemaProvider schemaProvider) {
+
+ this(props, jsc, spark, schemaProvider,
+ new FilePathsFetcher(props, getSourceFileFormat(props)),
+ new FileDataFetcher(props, props.getString(DATAFILE_FORMAT,
DEFAULT_SOURCE_FILE_FORMAT))
+ );
+ }
+
+ GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc,
SparkSession spark,
+ SchemaProvider schemaProvider, FilePathsFetcher
filePathsFetcher, FileDataFetcher fileDataFetcher) {
+ super(props, jsc, spark, schemaProvider);
+
+ DataSourceUtils.checkRequiredProperties(props,
Collections.singletonList(HOODIE_SRC_BASE_PATH));
+ srcPath = props.getString(HOODIE_SRC_BASE_PATH);
+ missingCheckpointStrategy = getMissingCheckpointStrategy(props);
+ numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH,
DEFAULT_NUM_INSTANTS_PER_FETCH);
+ checkIfFileExists = props.getBoolean(ENABLE_EXISTS_CHECK,
DEFAULT_ENABLE_EXISTS_CHECK);
+
+ this.filePathsFetcher = filePathsFetcher;
+ this.fileDataFetcher = fileDataFetcher;
+
+ addGcsAccessConfs(jsc);
+
+ LOG.info("srcPath: " + srcPath);
+ LOG.info("missingCheckpointStrategy: " + missingCheckpointStrategy);
+ LOG.info("numInstantsPerFetch: " + numInstantsPerFetch);
+ LOG.info("checkIfFileExists: " + checkIfFileExists);
+ }
+
+ @Override
+ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String>
lastCkptStr, long sourceLimit) {
+ QueryInfo queryInfo = getQueryInfo(lastCkptStr);
+
+ if (queryInfo.areStartAndEndInstantsEqual()) {
+ LOG.info("Already caught up. Begin Checkpoint was: " +
queryInfo.getStartInstant());
+ return Pair.of(Option.empty(), queryInfo.getStartInstant());
+ }
+
+ Dataset<Row> sourceForFilenames =
queryInfo.initializeSourceForFilenames(srcPath, sparkSession);
+
+ if (sourceForFilenames.isEmpty()) {
+ LOG.info("Source of file names is empty. Returning empty result and
endInstant: "
+ + queryInfo.getEndInstant());
+ return Pair.of(Option.empty(), queryInfo.getEndInstant());
+ }
+
+ return extractData(queryInfo, sourceForFilenames);
+ }
+
+ private Pair<Option<Dataset<Row>>, String> extractData(QueryInfo queryInfo,
Dataset<Row> sourceForFilenames) {
+ List<String> filepaths = filePathsFetcher.getGcsFilePaths(sparkContext,
sourceForFilenames, checkIfFileExists);
+
+ LOG.debug("Extracted " + filepaths.size() + " distinct files."
+ + " Some samples " +
filepaths.stream().limit(10).collect(Collectors.toList()));
+
+ Option<Dataset<Row>> fileDataRows =
fileDataFetcher.fetchFileData(sparkSession, filepaths, props);
+ return Pair.of(fileDataRows, queryInfo.getEndInstant());
+ }
+
+ private QueryInfo getQueryInfo(Option<String> lastCkptStr) {
+ Option<String> beginInstant = getBeginInstant(lastCkptStr);
+
+ Pair<String, Pair<String, String>> queryInfoPair =
calculateBeginAndEndInstants(
+ sparkContext, srcPath, numInstantsPerFetch, beginInstant,
missingCheckpointStrategy
+ );
+
+ QueryInfo queryInfo = new QueryInfo(queryInfoPair.getLeft(),
queryInfoPair.getRight().getLeft(),
+ queryInfoPair.getRight().getRight());
+
+ if (LOG.isDebugEnabled()) {
+ queryInfo.logDetails();
+ }
+
+ return queryInfo;
+ }
+
+ private Option<String> getBeginInstant(Option<String> lastCheckpoint) {
+ if (lastCheckpoint.isPresent() && !isNullOrEmpty(lastCheckpoint.get())) {
+ return lastCheckpoint;
+ }
+
+ return Option.empty();
+ }
+
+ private static MissingCheckpointStrategy
getMissingCheckpointStrategy(TypedProperties props) {
Review Comment:
I think it's ok to change the visibility of methods/constants in parent
class even it enhances reusability.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
+import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK;
+import static
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ENABLE_EXISTS_CHECK;
+import static
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DATAFILE_FORMAT;
+
+/**
+ * An incremental source that detects new data in a source table containing
metadata about GCS files,
+ * downloads the actual content of these files from GCS and stores them as
records into a destination table.
+ * <p>
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options
if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and
Mysql is used for Hive Metastore).
+
+
absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
+
+ This class can be invoked via spark-submit as follows. There's a bunch of
optional hive sync flags at the end.
+ $ bin/spark-submit \
+ --packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+ --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \
+ --driver-memory 4g \
+ --executor-memory 4g \
+ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
+ absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+ --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \
+ --op INSERT \
+ --hoodie-conf hoodie.deltastreamer.source.hoodieincr.file.format="parquet" \
+ --hoodie-conf
hoodie.deltastreamer.source.gcsincr.select.file.extension="jsonl" \
+ --hoodie-conf hoodie.deltastreamer.source.gcsincr.datafile.format="json" \
+ --hoodie-conf
hoodie.deltastreamer.source.gcsincr.select.relpath.prefix="country" \
+ --hoodie-conf
hoodie.deltastreamer.source.gcsincr.ignore.relpath.prefix="blah" \
+ --hoodie-conf
hoodie.deltastreamer.source.gcsincr.ignore.relpath.substring="blah" \
+ --hoodie-conf hoodie.datasource.write.recordkey.field=id \
+ --hoodie-conf hoodie.datasource.write.partitionpath.field= \
+ --hoodie-conf
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
\
+ --filter-dupes \
+ --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+ --hoodie-conf hoodie.combine.before.insert=true \
+ --source-ordering-field id \
+ --table-type COPY_ON_WRITE \
+ --target-base-path file:\/\/\/absolute_path_to/data-gcs \
+ --target-table gcs_data \
+ --continuous \
+ --source-limit 100 \
+ --min-sync-interval-seconds 60 \
+ --hoodie-conf
hoodie.deltastreamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs
\
+ --hoodie-conf
hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT
\
+ --enable-hive-sync \
+ --hoodie-conf hoodie.datasource.hive_sync.database=default \
+ --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data \
+ */
+public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
+
+ private final String srcPath;
+ private final boolean checkIfFileExists;
+ private final int numInstantsPerFetch;
+
+ private final MissingCheckpointStrategy missingCheckpointStrategy;
+ private final FilePathsFetcher filePathsFetcher;
+ private final FileDataFetcher fileDataFetcher;
+
+ private static final Logger LOG =
LogManager.getLogger(GcsEventsHoodieIncrSource.class);
+
+ public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext
jsc, SparkSession spark,
+ SchemaProvider schemaProvider) {
+
+ this(props, jsc, spark, schemaProvider,
+ new FilePathsFetcher(props, getSourceFileFormat(props)),
+ new FileDataFetcher(props, props.getString(DATAFILE_FORMAT,
DEFAULT_SOURCE_FILE_FORMAT))
+ );
+ }
+
+ GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc,
SparkSession spark,
+ SchemaProvider schemaProvider, FilePathsFetcher
filePathsFetcher, FileDataFetcher fileDataFetcher) {
+ super(props, jsc, spark, schemaProvider);
+
+ DataSourceUtils.checkRequiredProperties(props,
Collections.singletonList(HOODIE_SRC_BASE_PATH));
+ srcPath = props.getString(HOODIE_SRC_BASE_PATH);
+ missingCheckpointStrategy = getMissingCheckpointStrategy(props);
+ numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH,
DEFAULT_NUM_INSTANTS_PER_FETCH);
+ checkIfFileExists = props.getBoolean(ENABLE_EXISTS_CHECK,
DEFAULT_ENABLE_EXISTS_CHECK);
+
+ this.filePathsFetcher = filePathsFetcher;
+ this.fileDataFetcher = fileDataFetcher;
+
+ addGcsAccessConfs(jsc);
+
+ LOG.info("srcPath: " + srcPath);
+ LOG.info("missingCheckpointStrategy: " + missingCheckpointStrategy);
+ LOG.info("numInstantsPerFetch: " + numInstantsPerFetch);
+ LOG.info("checkIfFileExists: " + checkIfFileExists);
+ }
+
+ @Override
+ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String>
lastCkptStr, long sourceLimit) {
+ QueryInfo queryInfo = getQueryInfo(lastCkptStr);
+
+ if (queryInfo.areStartAndEndInstantsEqual()) {
+ LOG.info("Already caught up. Begin Checkpoint was: " +
queryInfo.getStartInstant());
+ return Pair.of(Option.empty(), queryInfo.getStartInstant());
+ }
+
+ Dataset<Row> sourceForFilenames =
queryInfo.initializeSourceForFilenames(srcPath, sparkSession);
+
+ if (sourceForFilenames.isEmpty()) {
+ LOG.info("Source of file names is empty. Returning empty result and
endInstant: "
+ + queryInfo.getEndInstant());
+ return Pair.of(Option.empty(), queryInfo.getEndInstant());
+ }
+
+ return extractData(queryInfo, sourceForFilenames);
+ }
+
+ private Pair<Option<Dataset<Row>>, String> extractData(QueryInfo queryInfo,
Dataset<Row> sourceForFilenames) {
+ List<String> filepaths = filePathsFetcher.getGcsFilePaths(sparkContext,
sourceForFilenames, checkIfFileExists);
+
+ LOG.debug("Extracted " + filepaths.size() + " distinct files."
+ + " Some samples " +
filepaths.stream().limit(10).collect(Collectors.toList()));
+
+ Option<Dataset<Row>> fileDataRows =
fileDataFetcher.fetchFileData(sparkSession, filepaths, props);
+ return Pair.of(fileDataRows, queryInfo.getEndInstant());
+ }
+
+ private QueryInfo getQueryInfo(Option<String> lastCkptStr) {
+ Option<String> beginInstant = getBeginInstant(lastCkptStr);
+
+ Pair<String, Pair<String, String>> queryInfoPair =
calculateBeginAndEndInstants(
+ sparkContext, srcPath, numInstantsPerFetch, beginInstant,
missingCheckpointStrategy
+ );
+
+ QueryInfo queryInfo = new QueryInfo(queryInfoPair.getLeft(),
queryInfoPair.getRight().getLeft(),
+ queryInfoPair.getRight().getRight());
+
+ if (LOG.isDebugEnabled()) {
+ queryInfo.logDetails();
+ }
+
+ return queryInfo;
+ }
+
+ private Option<String> getBeginInstant(Option<String> lastCheckpoint) {
+ if (lastCheckpoint.isPresent() && !isNullOrEmpty(lastCheckpoint.get())) {
+ return lastCheckpoint;
+ }
+
+ return Option.empty();
+ }
+
+ private static MissingCheckpointStrategy
getMissingCheckpointStrategy(TypedProperties props) {
+ boolean readLatestOnMissingCkpt = props.getBoolean(
+ READ_LATEST_INSTANT_ON_MISSING_CKPT,
DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT);
+
+ if (readLatestOnMissingCkpt) {
+ return MissingCheckpointStrategy.READ_LATEST;
+ }
+
+ if (props.containsKey(MISSING_CHECKPOINT_STRATEGY)) {
+ return
MissingCheckpointStrategy.valueOf(props.getString(MISSING_CHECKPOINT_STRATEGY));
+ }
+
+ return null;
+ }
+
+ /**
+ * Ref: https://hudi.apache.org/docs/gcs_hoodie
+ */
+ private static void addGcsAccessConfs(JavaSparkContext jsc) {
+ jsc.hadoopConfiguration().set("fs.gs.impl",
GoogleHadoopFileSystem.class.getName());
Review Comment:
hadoop-aws is provided by the runtime environment, e.g. on EMR.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
+import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK;
+import static
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ENABLE_EXISTS_CHECK;
+import static
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DATAFILE_FORMAT;
+
+/**
+ * An incremental source that detects new data in a source table containing
metadata about GCS files,
+ * downloads the actual content of these files from GCS and stores them as
records into a destination table.
+ * <p>
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options
if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and
Mysql is used for Hive Metastore).
+
+
absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
+
+ This class can be invoked via spark-submit as follows. There's a bunch of
optional hive sync flags at the end.
+ $ bin/spark-submit \
+ --packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+ --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \
+ --driver-memory 4g \
+ --executor-memory 4g \
+ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
+ absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+ --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \
+ --op INSERT \
+ --hoodie-conf hoodie.deltastreamer.source.hoodieincr.file.format="parquet" \
+ --hoodie-conf
hoodie.deltastreamer.source.gcsincr.select.file.extension="jsonl" \
+ --hoodie-conf hoodie.deltastreamer.source.gcsincr.datafile.format="json" \
+ --hoodie-conf
hoodie.deltastreamer.source.gcsincr.select.relpath.prefix="country" \
+ --hoodie-conf
hoodie.deltastreamer.source.gcsincr.ignore.relpath.prefix="blah" \
+ --hoodie-conf
hoodie.deltastreamer.source.gcsincr.ignore.relpath.substring="blah" \
+ --hoodie-conf hoodie.datasource.write.recordkey.field=id \
+ --hoodie-conf hoodie.datasource.write.partitionpath.field= \
+ --hoodie-conf
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
\
+ --filter-dupes \
+ --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+ --hoodie-conf hoodie.combine.before.insert=true \
+ --source-ordering-field id \
+ --table-type COPY_ON_WRITE \
+ --target-base-path file:\/\/\/absolute_path_to/data-gcs \
+ --target-table gcs_data \
+ --continuous \
+ --source-limit 100 \
+ --min-sync-interval-seconds 60 \
+ --hoodie-conf
hoodie.deltastreamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs
\
+ --hoodie-conf
hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT
\
+ --enable-hive-sync \
+ --hoodie-conf hoodie.datasource.hive_sync.database=default \
+ --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data \
+ */
+public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
+
+ private final String srcPath;
+ private final boolean checkIfFileExists;
+ private final int numInstantsPerFetch;
+
+ private final MissingCheckpointStrategy missingCheckpointStrategy;
+ private final FilePathsFetcher filePathsFetcher;
+ private final FileDataFetcher fileDataFetcher;
+
+ private static final Logger LOG =
LogManager.getLogger(GcsEventsHoodieIncrSource.class);
+
+ public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext
jsc, SparkSession spark,
+ SchemaProvider schemaProvider) {
+
+ this(props, jsc, spark, schemaProvider,
+ new FilePathsFetcher(props, getSourceFileFormat(props)),
+ new FileDataFetcher(props, props.getString(DATAFILE_FORMAT,
DEFAULT_SOURCE_FILE_FORMAT))
+ );
+ }
+
+ GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc,
SparkSession spark,
+ SchemaProvider schemaProvider, FilePathsFetcher
filePathsFetcher, FileDataFetcher fileDataFetcher) {
+ super(props, jsc, spark, schemaProvider);
+
+ DataSourceUtils.checkRequiredProperties(props,
Collections.singletonList(HOODIE_SRC_BASE_PATH));
+ srcPath = props.getString(HOODIE_SRC_BASE_PATH);
+ missingCheckpointStrategy = getMissingCheckpointStrategy(props);
+ numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH,
DEFAULT_NUM_INSTANTS_PER_FETCH);
+ checkIfFileExists = props.getBoolean(ENABLE_EXISTS_CHECK,
DEFAULT_ENABLE_EXISTS_CHECK);
+
+ this.filePathsFetcher = filePathsFetcher;
+ this.fileDataFetcher = fileDataFetcher;
+
+ addGcsAccessConfs(jsc);
+
+ LOG.info("srcPath: " + srcPath);
+ LOG.info("missingCheckpointStrategy: " + missingCheckpointStrategy);
+ LOG.info("numInstantsPerFetch: " + numInstantsPerFetch);
+ LOG.info("checkIfFileExists: " + checkIfFileExists);
+ }
+
+ @Override
+ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String>
lastCkptStr, long sourceLimit) {
+ QueryInfo queryInfo = getQueryInfo(lastCkptStr);
+
+ if (queryInfo.areStartAndEndInstantsEqual()) {
+ LOG.info("Already caught up. Begin Checkpoint was: " +
queryInfo.getStartInstant());
+ return Pair.of(Option.empty(), queryInfo.getStartInstant());
+ }
+
+ Dataset<Row> sourceForFilenames =
queryInfo.initializeSourceForFilenames(srcPath, sparkSession);
+
+ if (sourceForFilenames.isEmpty()) {
+ LOG.info("Source of file names is empty. Returning empty result and
endInstant: "
+ + queryInfo.getEndInstant());
+ return Pair.of(Option.empty(), queryInfo.getEndInstant());
+ }
+
+ return extractData(queryInfo, sourceForFilenames);
+ }
+
+ private Pair<Option<Dataset<Row>>, String> extractData(QueryInfo queryInfo,
Dataset<Row> sourceForFilenames) {
+ List<String> filepaths = filePathsFetcher.getGcsFilePaths(sparkContext,
sourceForFilenames, checkIfFileExists);
+
+ LOG.debug("Extracted " + filepaths.size() + " distinct files."
+ + " Some samples " +
filepaths.stream().limit(10).collect(Collectors.toList()));
+
+ Option<Dataset<Row>> fileDataRows =
fileDataFetcher.fetchFileData(sparkSession, filepaths, props);
+ return Pair.of(fileDataRows, queryInfo.getEndInstant());
+ }
+
+ private QueryInfo getQueryInfo(Option<String> lastCkptStr) {
+ Option<String> beginInstant = getBeginInstant(lastCkptStr);
+
+ Pair<String, Pair<String, String>> queryInfoPair =
calculateBeginAndEndInstants(
+ sparkContext, srcPath, numInstantsPerFetch, beginInstant,
missingCheckpointStrategy
+ );
+
+ QueryInfo queryInfo = new QueryInfo(queryInfoPair.getLeft(),
queryInfoPair.getRight().getLeft(),
+ queryInfoPair.getRight().getRight());
+
+ if (LOG.isDebugEnabled()) {
+ queryInfo.logDetails();
+ }
+
+ return queryInfo;
+ }
+
+ private Option<String> getBeginInstant(Option<String> lastCheckpoint) {
+ if (lastCheckpoint.isPresent() && !isNullOrEmpty(lastCheckpoint.get())) {
+ return lastCheckpoint;
+ }
+
+ return Option.empty();
+ }
+
+ private static MissingCheckpointStrategy
getMissingCheckpointStrategy(TypedProperties props) {
+ boolean readLatestOnMissingCkpt = props.getBoolean(
+ READ_LATEST_INSTANT_ON_MISSING_CKPT,
DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT);
+
+ if (readLatestOnMissingCkpt) {
+ return MissingCheckpointStrategy.READ_LATEST;
+ }
+
+ if (props.containsKey(MISSING_CHECKPOINT_STRATEGY)) {
+ return
MissingCheckpointStrategy.valueOf(props.getString(MISSING_CHECKPOINT_STRATEGY));
+ }
+
+ return null;
+ }
+
+ /**
+ * Ref: https://hudi.apache.org/docs/gcs_hoodie
+ */
+ private static void addGcsAccessConfs(JavaSparkContext jsc) {
+ jsc.hadoopConfiguration().set("fs.gs.impl",
GoogleHadoopFileSystem.class.getName());
Review Comment:
@pramodbiligiri we can actually remove the setting here and document that
users need to set all these configs as this doc asks the user anyway.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]