codope commented on code in PR #6665:
URL: https://github.com/apache/hudi/pull/6665#discussion_r977747273


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.gcs.PubsubMessagesFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.MessageBatch;
+import org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity;
+import org.apache.hudi.utilities.sources.helpers.gcs.MetadataMsg;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ACK_MESSAGES;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ACK_MESSAGES_DEFAULT_VALUE;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.BATCH_SIZE_CONF;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_BATCH_SIZE;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.GOOGLE_PROJECT_ID;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.PUBSUB_SUBSCRIPTION_ID;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.DO_SKIP;
+
+/*
+ * An incremental source to fetch from a Google Cloud Pubsub topic (a 
subscription, to be precise),
+ * and download them into a Hudi table. The messages are assumed to be of type 
Cloud Storage Pubsub Notification.
+ *
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options 
if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and 
Mysql is used for Hive Metastore).
+
+ 
absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
+
+This class can be invoked via spark-submit as follows. There's a bunch of 
optional hive sync flags at the end:
+$ bin/spark-submit \
+--driver-memory 4g \
+--executor-memory 4g \
+--packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
+absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+--source-class org.apache.hudi.utilities.sources.GcsEventsSource \
+--op INSERT \
+--hoodie-conf hoodie.datasource.write.recordkey.field="id" \
+--source-ordering-field timeCreated \
+--hoodie-conf hoodie.index.type=GLOBAL_BLOOM \
+--filter-dupes \
+--allow-commit-on-no-checkpoint-change \
+--hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+--hoodie-conf hoodie.combine.before.insert=true \
+--hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
 \
+--hoodie-conf hoodie.datasource.write.partitionpath.field=bucket \
+--hoodie-conf hoodie.deltastreamer.source.gcs.project.id=infra-dev-358110 \
+--hoodie-conf hoodie.deltastreamer.source.gcs.subscription.id=gcs-obj-8-sub-1 \
+--hoodie-conf hoodie.deltastreamer.source.gcs.ack=true \
+--table-type COPY_ON_WRITE \
+--target-base-path file:\/\/\/absolute_path_to/meta-gcs \
+--target-table gcs_meta \
+--continuous \
+--source-limit 100 \
+--min-sync-interval-seconds 100 \
+--enable-hive-sync \

Review Comment:
   Why would we want to enable hive sync for metadata table?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
+import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ENABLE_EXISTS_CHECK;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DATAFILE_FORMAT;
+
+/**
+ * An incremental source that detects new data in a source table containing 
metadata about GCS files,
+ * downloads the actual content of these files from GCS and stores them as 
records into a destination table.
+ * <p>
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options 
if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and 
Mysql is used for Hive Metastore).
+
+ 
absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar

Review Comment:
   Are all these runtime jars absolutely necessary?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/QueryInfo.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import static org.apache.hudi.DataSourceReadOptions.BEGIN_INSTANTTIME;
+import static org.apache.hudi.DataSourceReadOptions.END_INSTANTTIME;
+import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE;
+import static 
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
+import static 
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL;
+
+/**
+ * Uses the start and end instants of a DeltaStreamer Source to help construct 
the right kind
+ * of query for subsequent requests.
+ */
+public class QueryInfo {
+
+  private final String queryType;
+  private final String startInstant;
+  private final String endInstant;
+
+  private static final Logger LOG = LogManager.getLogger(QueryInfo.class);
+
+  public QueryInfo(Pair<String, Pair<String, String>> queryInfoPair) {
+    this.queryType = queryInfoPair.getLeft();
+    this.startInstant = queryInfoPair.getRight().getLeft();
+    this.endInstant = queryInfoPair.getRight().getRight();
+  }
+
+  public Dataset<Row> initializeSourceForFilenames(String srcPath, 
SparkSession sparkSession) {
+    if (isIncremental()) {
+      return incrementalQuery(sparkSession).load(srcPath);
+    }
+
+    // Issue a snapshot query.
+    return snapshotQuery(sparkSession).load(srcPath)
+            .filter(String.format("%s > '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD, getStartInstant()));
+  }
+
+  public boolean areStartAndEndInstantsEqual() {
+    return getStartInstant().equals(getEndInstant());

Review Comment:
   are startInstant and endInstant non-nullable?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import static com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create;
+import java.io.IOException;
+import java.util.List;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_MAX_INBOUND_MESSAGE_SIZE;
+
+/**
+ * Fetch messages from a specified Google Cloud Pubsub subscription.
+ */
+public class PubsubMessagesFetcher {
+
+  private final String googleProjectId;
+  private final String pubsubSubscriptionId;
+
+  private final int batchSize;
+  private final SubscriberStubSettings subscriberStubSettings;
+
+  private final int maxInboundMessageSize;

Review Comment:
   convert to local variable in the constructor.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MetadataMsg.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import com.google.pubsub.v1.PubsubMessage;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.DO_PROCESS;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.DO_SKIP;
+
+/**
+ * Wraps a PubsubMessage assuming it's from Cloud Storage Pubsub Notifications 
(CSPN), and
+ * adds relevant helper methods.
+ * For details of CSPN messages see: 
https://cloud.google.com/storage/docs/pubsub-notifications
+ */
+public class MetadataMsg {
+
+  // The CSPN message to wrap
+  private final PubsubMessage message;
+
+  private static final String EVENT_NAME_OBJECT_FINALIZE = "OBJECT_FINALIZE";
+
+  private static final String ATTR_EVENT_TYPE = "eventType";
+  private static final String ATTR_OBJECT_ID = "objectId";
+  private static final String ATTR_OVERWROTE_GENERATION = 
"overwroteGeneration";
+
+  public MetadataMsg(PubsubMessage message) {
+    this.message = message;
+  }
+
+  public String toStringUtf8() {
+    return message.getData().toStringUtf8();
+  }
+
+  /**
+   * Whether a message is valid to be ingested and stored by this Metadata 
puller.
+   * Ref: https://cloud.google.com/storage/docs/pubsub-notifications#events
+   */
+  public MessageValidity shouldBeProcessed() {
+    if (!isNewFileCreation()) {
+      return DO_SKIP.setReasonToSkip(
+              "eventType: " + getEventType() + ". Not a file creation message."
+      );
+    }
+
+    if (isOverwriteOfExistingFile()) {
+      return DO_SKIP.setReasonToSkip(
+              "eventType: " + getEventType()
+                      + ". Overwrite of existing objectId: " + getObjectId()
+                      + " with generation numner: " + getOverwroteGeneration()
+      );
+    }
+
+    return DO_PROCESS;
+  }
+
+  /**
+   * Whether message represents an overwrite of an existing file.
+   * Ref: 
https://cloud.google.com/storage/docs/pubsub-notifications#replacing_objects
+   */
+  private boolean isOverwriteOfExistingFile() {
+    return !isNullOrEmpty(getOverwroteGeneration());
+  }
+
+  /**
+   * Returns true if message corresponds to new file creation, false if not.
+   * Ref: https://cloud.google.com/storage/docs/pubsub-notifications#events
+   */
+  private boolean isNewFileCreation() {

Review Comment:
   this can be static as well



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsIngestionConfig.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+/**
+ * Config keys and defaults for GCS Ingestion
+ */
+public class GcsIngestionConfig {
+
+  /**
+   * The GCP Project Id where the Pubsub Subscription to ingest from resides. 
Needed to connect
+   * to the Pubsub subscription
+   */
+  public static final String GOOGLE_PROJECT_ID = 
"hoodie.deltastreamer.source.gcs.project.id";
+
+  /**
+   * The GCP Pubsub subscription id for the GCS Notifications. Needed to 
connect to the Pubsub
+   * subscription.
+   */
+  public static final String PUBSUB_SUBSCRIPTION_ID = 
"hoodie.deltastreamer.source.gcs.subscription.id";
+
+  /**
+   * How many messages to pull from Cloud Pubsub at a time. Also see {@link 
DEFAULT_BATCH_SIZE}.

Review Comment:
   nit: `DEFAULT_BATCH_SIZE` -> `#DEFAULT_BATCH_SIZE` for proper comment 
rendering. Similarly for below configs.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/FilePathsFetcher.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.SELECT_RELATIVE_PATH_PREFIX;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.IGNORE_RELATIVE_PATH_PREFIX;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.IGNORE_RELATIVE_PATH_SUBSTR;
+
+/**
+ * Extracts a list of fully qualified GCS filepaths from a given Spark Dataset 
as input.
+ * Optionally:
+ * i) Match the filename and path against provided input filter strings
+ * ii) Check if each file exists on GCS, in which case it assumes SparkContext 
is already
+ * configured with GCS options through 
GcsEventsHoodieIncrSource.addGcsAccessConfs().
+ */
+public class FilePathsFetcher implements Serializable {
+
+  /**
+   * The default file format to assume if {@link 
GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION} is not given.

Review Comment:
   nit: `GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION` -> 
`GcsIngestionConfig#GCS_INCR_DATAFILE_EXTENSION` for proper comment rendering. 
Same below.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MetadataMsg.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import com.google.pubsub.v1.PubsubMessage;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.DO_PROCESS;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.DO_SKIP;
+
+/**
+ * Wraps a PubsubMessage assuming it's from Cloud Storage Pubsub Notifications 
(CSPN), and
+ * adds relevant helper methods.
+ * For details of CSPN messages see: 
https://cloud.google.com/storage/docs/pubsub-notifications
+ */
+public class MetadataMsg {

Review Comment:
   Please consider changing this class to be a helper class without requiring 
instantiation. All methods can be static if the PubSub message is passed to it. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
+import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ENABLE_EXISTS_CHECK;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DATAFILE_FORMAT;
+
+/**
+ * An incremental source that detects new data in a source table containing 
metadata about GCS files,
+ * downloads the actual content of these files from GCS and stores them as 
records into a destination table.
+ * <p>
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options 
if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and 
Mysql is used for Hive Metastore).
+
+ 
absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
+
+ This class can be invoked via spark-submit as follows. There's a bunch of 
optional hive sync flags at the end.
+  $ bin/spark-submit \
+  --packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+  --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \
+  --driver-memory 4g \
+  --executor-memory 4g \
+  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
+  absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+  --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \
+  --op INSERT \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.file.format="parquet" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.select.file.extension="jsonl" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.datafile.format="json" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.select.relpath.prefix="country" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.ignore.relpath.prefix="blah" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.ignore.relpath.substring="blah" \
+  --hoodie-conf hoodie.datasource.write.recordkey.field=id \
+  --hoodie-conf hoodie.datasource.write.partitionpath.field= \
+  --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
 \
+  --filter-dupes \
+  --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+  --hoodie-conf hoodie.combine.before.insert=true \
+  --source-ordering-field id \
+  --table-type COPY_ON_WRITE \
+  --target-base-path file:\/\/\/absolute_path_to/data-gcs \
+  --target-table gcs_data \
+  --continuous \
+  --source-limit 100 \
+  --min-sync-interval-seconds 60 \
+  --hoodie-conf 
hoodie.deltastreamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs
 \
+  --hoodie-conf 
hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT
 \
+  --enable-hive-sync \
+  --hoodie-conf hoodie.datasource.hive_sync.database=default \
+  --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data \
+ */
+public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
+
+  private String srcPath;
+  private String fileFormat;
+  private final boolean checkIfFileExists;
+  private int numInstantsPerFetch;
+
+  private MissingCheckpointStrategy missingCheckpointStrategy;
+  private final FilePathsFetcher filePathsFetcher;
+  private final FileDataFetcher fileDataFetcher;
+
+  private static final Logger LOG = 
LogManager.getLogger(GcsEventsHoodieIncrSource.class);
+
+  public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext 
jsc, SparkSession spark,
+                                   SchemaProvider schemaProvider) {
+
+    this(props, jsc, spark, schemaProvider,
+            new FilePathsFetcher(props, props.getString(SOURCE_FILE_FORMAT, 
DEFAULT_SOURCE_FILE_FORMAT)),
+            new FileDataFetcher(props, props.getString(DATAFILE_FORMAT, 
DEFAULT_SOURCE_FILE_FORMAT))
+    );
+  }
+
+  GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, 
SparkSession spark,
+                            SchemaProvider schemaProvider, FilePathsFetcher 
filePathsFetcher, FileDataFetcher fileDataFetcher) {
+    super(props, jsc, spark, schemaProvider);
+
+    DataSourceUtils.checkRequiredProperties(props, 
Collections.singletonList(HOODIE_SRC_BASE_PATH));
+    srcPath = props.getString(HOODIE_SRC_BASE_PATH);
+    fileFormat = props.getString(SOURCE_FILE_FORMAT, 
DEFAULT_SOURCE_FILE_FORMAT);
+    missingCheckpointStrategy = getMissingCheckpointStrategy(props);
+    numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH, 
DEFAULT_NUM_INSTANTS_PER_FETCH);
+    checkIfFileExists = props.getBoolean(ENABLE_EXISTS_CHECK, 
DEFAULT_ENABLE_EXISTS_CHECK);
+
+    this.filePathsFetcher = filePathsFetcher;
+    this.fileDataFetcher = fileDataFetcher;
+
+    addGcsAccessConfs(jsc);
+
+    LOG.info("srcPath: " + srcPath);
+    LOG.info("fileFormat: " + fileFormat);
+    LOG.info("missingCheckpointStrategy: " + missingCheckpointStrategy);
+    LOG.info("numInstantsPerFetch: " + numInstantsPerFetch);
+    LOG.info("checkIfFileExists: " + checkIfFileExists);
+  }
+
+  @Override
+  public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCkptStr, long sourceLimit) {
+    QueryInfo queryInfo = getQueryInfo(lastCkptStr);
+
+    if (queryInfo.areStartAndEndInstantsEqual()) {
+      LOG.info("Already caught up. Begin Checkpoint was: " + 
queryInfo.getStartInstant());
+      return Pair.of(Option.empty(), queryInfo.getStartInstant());
+    }
+
+    Dataset<Row> sourceForFilenames = 
queryInfo.initializeSourceForFilenames(srcPath, sparkSession);
+
+    if (sourceForFilenames.isEmpty()) {
+      LOG.info("Source of file names is empty. Returning empty result and 
endInstant: "
+              + queryInfo.getEndInstant());
+      return Pair.of(Option.empty(), queryInfo.getEndInstant());
+    }
+
+    return extractData(queryInfo, sourceForFilenames);
+  }
+
+  private Pair<Option<Dataset<Row>>, String> extractData(QueryInfo queryInfo, 
Dataset<Row> sourceForFilenames) {
+    List<String> filepaths = filePathsFetcher.getGcsFilePaths(sparkContext, 
sourceForFilenames, checkIfFileExists);
+
+    LOG.info("Extracted " + filepaths.size() + " distinct files."
+            + " Some samples " + 
filepaths.stream().limit(10).collect(Collectors.toList()));
+
+    Option<Dataset<Row>> fileDataDs = 
fileDataFetcher.fetchFileDataFromGcs(sparkSession, filepaths);

Review Comment:
   nit: rename to `fileDataRows`



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
+import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ENABLE_EXISTS_CHECK;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DATAFILE_FORMAT;
+
+/**
+ * An incremental source that detects new data in a source table containing 
metadata about GCS files,
+ * downloads the actual content of these files from GCS and stores them as 
records into a destination table.
+ * <p>
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options 
if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and 
Mysql is used for Hive Metastore).
+
+ 
absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
+
+ This class can be invoked via spark-submit as follows. There's a bunch of 
optional hive sync flags at the end.
+  $ bin/spark-submit \
+  --packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+  --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \
+  --driver-memory 4g \
+  --executor-memory 4g \
+  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
+  absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+  --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \
+  --op INSERT \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.file.format="parquet" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.select.file.extension="jsonl" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.datafile.format="json" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.select.relpath.prefix="country" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.ignore.relpath.prefix="blah" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.ignore.relpath.substring="blah" \
+  --hoodie-conf hoodie.datasource.write.recordkey.field=id \
+  --hoodie-conf hoodie.datasource.write.partitionpath.field= \
+  --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
 \
+  --filter-dupes \
+  --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+  --hoodie-conf hoodie.combine.before.insert=true \
+  --source-ordering-field id \
+  --table-type COPY_ON_WRITE \
+  --target-base-path file:\/\/\/absolute_path_to/data-gcs \
+  --target-table gcs_data \
+  --continuous \
+  --source-limit 100 \
+  --min-sync-interval-seconds 60 \
+  --hoodie-conf 
hoodie.deltastreamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs
 \
+  --hoodie-conf 
hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT
 \
+  --enable-hive-sync \
+  --hoodie-conf hoodie.datasource.hive_sync.database=default \
+  --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data \
+ */
+public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
+
+  private String srcPath;
+  private String fileFormat;

Review Comment:
   Even these can be final right? Are they going to change throughout the 
lifecyle of ingration?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
+import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ENABLE_EXISTS_CHECK;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DATAFILE_FORMAT;
+
+/**
+ * An incremental source that detects new data in a source table containing 
metadata about GCS files,
+ * downloads the actual content of these files from GCS and stores them as 
records into a destination table.
+ * <p>
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options 
if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and 
Mysql is used for Hive Metastore).
+
+ 
absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
+
+ This class can be invoked via spark-submit as follows. There's a bunch of 
optional hive sync flags at the end.
+  $ bin/spark-submit \
+  --packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+  --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \
+  --driver-memory 4g \
+  --executor-memory 4g \
+  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
+  absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+  --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \
+  --op INSERT \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.file.format="parquet" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.select.file.extension="jsonl" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.datafile.format="json" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.select.relpath.prefix="country" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.ignore.relpath.prefix="blah" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.ignore.relpath.substring="blah" \
+  --hoodie-conf hoodie.datasource.write.recordkey.field=id \
+  --hoodie-conf hoodie.datasource.write.partitionpath.field= \
+  --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
 \
+  --filter-dupes \
+  --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+  --hoodie-conf hoodie.combine.before.insert=true \
+  --source-ordering-field id \
+  --table-type COPY_ON_WRITE \
+  --target-base-path file:\/\/\/absolute_path_to/data-gcs \
+  --target-table gcs_data \
+  --continuous \
+  --source-limit 100 \
+  --min-sync-interval-seconds 60 \
+  --hoodie-conf 
hoodie.deltastreamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs
 \
+  --hoodie-conf 
hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT
 \
+  --enable-hive-sync \
+  --hoodie-conf hoodie.datasource.hive_sync.database=default \
+  --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data \
+ */
+public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
+
+  private String srcPath;
+  private String fileFormat;
+  private final boolean checkIfFileExists;
+  private int numInstantsPerFetch;
+
+  private MissingCheckpointStrategy missingCheckpointStrategy;

Review Comment:
   even this can be final.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
+import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ENABLE_EXISTS_CHECK;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DATAFILE_FORMAT;
+
+/**
+ * An incremental source that detects new data in a source table containing 
metadata about GCS files,
+ * downloads the actual content of these files from GCS and stores them as 
records into a destination table.
+ * <p>
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options 
if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and 
Mysql is used for Hive Metastore).
+
+ 
absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
+
+ This class can be invoked via spark-submit as follows. There's a bunch of 
optional hive sync flags at the end.
+  $ bin/spark-submit \
+  --packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+  --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \
+  --driver-memory 4g \
+  --executor-memory 4g \
+  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
+  absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+  --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \
+  --op INSERT \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.file.format="parquet" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.select.file.extension="jsonl" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.datafile.format="json" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.select.relpath.prefix="country" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.ignore.relpath.prefix="blah" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.ignore.relpath.substring="blah" \
+  --hoodie-conf hoodie.datasource.write.recordkey.field=id \
+  --hoodie-conf hoodie.datasource.write.partitionpath.field= \
+  --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
 \
+  --filter-dupes \
+  --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+  --hoodie-conf hoodie.combine.before.insert=true \
+  --source-ordering-field id \
+  --table-type COPY_ON_WRITE \
+  --target-base-path file:\/\/\/absolute_path_to/data-gcs \
+  --target-table gcs_data \
+  --continuous \
+  --source-limit 100 \
+  --min-sync-interval-seconds 60 \
+  --hoodie-conf 
hoodie.deltastreamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs
 \
+  --hoodie-conf 
hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT
 \
+  --enable-hive-sync \
+  --hoodie-conf hoodie.datasource.hive_sync.database=default \
+  --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data \
+ */
+public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
+
+  private String srcPath;
+  private String fileFormat;
+  private final boolean checkIfFileExists;
+  private int numInstantsPerFetch;
+
+  private MissingCheckpointStrategy missingCheckpointStrategy;
+  private final FilePathsFetcher filePathsFetcher;
+  private final FileDataFetcher fileDataFetcher;
+
+  private static final Logger LOG = 
LogManager.getLogger(GcsEventsHoodieIncrSource.class);
+
+  public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext 
jsc, SparkSession spark,
+                                   SchemaProvider schemaProvider) {
+
+    this(props, jsc, spark, schemaProvider,
+            new FilePathsFetcher(props, props.getString(SOURCE_FILE_FORMAT, 
DEFAULT_SOURCE_FILE_FORMAT)),
+            new FileDataFetcher(props, props.getString(DATAFILE_FORMAT, 
DEFAULT_SOURCE_FILE_FORMAT))
+    );
+  }
+
+  GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, 
SparkSession spark,
+                            SchemaProvider schemaProvider, FilePathsFetcher 
filePathsFetcher, FileDataFetcher fileDataFetcher) {
+    super(props, jsc, spark, schemaProvider);
+
+    DataSourceUtils.checkRequiredProperties(props, 
Collections.singletonList(HOODIE_SRC_BASE_PATH));
+    srcPath = props.getString(HOODIE_SRC_BASE_PATH);
+    fileFormat = props.getString(SOURCE_FILE_FORMAT, 
DEFAULT_SOURCE_FILE_FORMAT);
+    missingCheckpointStrategy = getMissingCheckpointStrategy(props);
+    numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH, 
DEFAULT_NUM_INSTANTS_PER_FETCH);
+    checkIfFileExists = props.getBoolean(ENABLE_EXISTS_CHECK, 
DEFAULT_ENABLE_EXISTS_CHECK);
+
+    this.filePathsFetcher = filePathsFetcher;
+    this.fileDataFetcher = fileDataFetcher;
+
+    addGcsAccessConfs(jsc);
+
+    LOG.info("srcPath: " + srcPath);
+    LOG.info("fileFormat: " + fileFormat);
+    LOG.info("missingCheckpointStrategy: " + missingCheckpointStrategy);
+    LOG.info("numInstantsPerFetch: " + numInstantsPerFetch);
+    LOG.info("checkIfFileExists: " + checkIfFileExists);
+  }
+
+  @Override
+  public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCkptStr, long sourceLimit) {
+    QueryInfo queryInfo = getQueryInfo(lastCkptStr);
+
+    if (queryInfo.areStartAndEndInstantsEqual()) {
+      LOG.info("Already caught up. Begin Checkpoint was: " + 
queryInfo.getStartInstant());
+      return Pair.of(Option.empty(), queryInfo.getStartInstant());
+    }
+
+    Dataset<Row> sourceForFilenames = 
queryInfo.initializeSourceForFilenames(srcPath, sparkSession);
+
+    if (sourceForFilenames.isEmpty()) {
+      LOG.info("Source of file names is empty. Returning empty result and 
endInstant: "
+              + queryInfo.getEndInstant());
+      return Pair.of(Option.empty(), queryInfo.getEndInstant());
+    }
+
+    return extractData(queryInfo, sourceForFilenames);
+  }
+
+  private Pair<Option<Dataset<Row>>, String> extractData(QueryInfo queryInfo, 
Dataset<Row> sourceForFilenames) {
+    List<String> filepaths = filePathsFetcher.getGcsFilePaths(sparkContext, 
sourceForFilenames, checkIfFileExists);
+
+    LOG.info("Extracted " + filepaths.size() + " distinct files."
+            + " Some samples " + 
filepaths.stream().limit(10).collect(Collectors.toList()));

Review Comment:
   Why limit 10 is hardcoded? Should it be configurable?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MetadataMsg.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import com.google.pubsub.v1.PubsubMessage;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.DO_PROCESS;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.DO_SKIP;
+
+/**
+ * Wraps a PubsubMessage assuming it's from Cloud Storage Pubsub Notifications 
(CSPN), and
+ * adds relevant helper methods.
+ * For details of CSPN messages see: 
https://cloud.google.com/storage/docs/pubsub-notifications
+ */
+public class MetadataMsg {

Review Comment:
   nit: rename to `MetadataMessage`



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/QueryInfo.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import static org.apache.hudi.DataSourceReadOptions.BEGIN_INSTANTTIME;
+import static org.apache.hudi.DataSourceReadOptions.END_INSTANTTIME;
+import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE;
+import static 
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
+import static 
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL;
+
+/**
+ * Uses the start and end instants of a DeltaStreamer Source to help construct 
the right kind
+ * of query for subsequent requests.
+ */
+public class QueryInfo {
+
+  private final String queryType;
+  private final String startInstant;
+  private final String endInstant;
+
+  private static final Logger LOG = LogManager.getLogger(QueryInfo.class);
+
+  public QueryInfo(Pair<String, Pair<String, String>> queryInfoPair) {

Review Comment:
   let's use class attributes instead of Pair.. more readable that way.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/FileDataFetcher.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.SPARK_DATASOURCE_OPTIONS;
+
+/**
+ * Connects to GCS from Spark and downloads data from a given list of files.
+ * Assumes SparkContext is already configured with GCS options through 
GcsEventsHoodieIncrSource.addGcsAccessConfs().
+ */
+public class FileDataFetcher implements Serializable {
+
+  private String fileFormat;
+  private TypedProperties props;
+
+  private static final Logger LOG = 
LogManager.getLogger(FileDataFetcher.class);
+  private static final long serialVersionUID = 1L;
+
+  public FileDataFetcher(TypedProperties props, String fileFormat) {
+    this.fileFormat = fileFormat;
+    this.props = props;
+  }
+
+  /**
+   * @param filepaths Files in GCS from which to fetch data
+   * @return Data in the given list of files, as a Spark DataSet
+   */
+  public Option<Dataset<Row>> fetchFileDataFromGcs(SparkSession spark, 
List<String> filepaths) {

Review Comment:
   We can reuse the corresponding method from S3 incr source. Let's move to a 
util class?



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.gcs.PubsubMessagesFetcher;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.GOOGLE_PROJECT_ID;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.PUBSUB_SUBSCRIPTION_ID;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestGcsEventsSource extends UtilitiesTestBase {
+
+  @Mock
+  PubsubMessagesFetcher pubsubMessagesFetcher;
+
+  protected FilebasedSchemaProvider schemaProvider;
+  private TypedProperties props;
+
+  private static final String CHECKPOINT_VALUE_ZERO = "0";
+
+  @BeforeAll
+  public static void beforeAll() throws Exception {
+    UtilitiesTestBase.initTestServices(false, false);
+  }
+
+  @AfterAll
+  public static void afterAll() {
+    UtilitiesTestBase.cleanupClass();
+  }
+
+  @BeforeEach
+  public void beforeEach() throws Exception {
+    super.setup();
+    schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), 
jsc);
+    MockitoAnnotations.initMocks(this);
+
+    props = new TypedProperties();
+    props.put(GOOGLE_PROJECT_ID, "dummy-project");
+    props.put(PUBSUB_SUBSCRIPTION_ID, "dummy-subscription");
+  }
+
+  @AfterEach
+  public void afterEach() throws Exception {
+    super.teardown();
+  }
+
+  @Test
+  public void shouldReturnEmptyOnNoMessages() {
+    
when(pubsubMessagesFetcher.fetchMessages()).thenReturn(Collections.emptyList());
+
+    GcsEventsSource source = new GcsEventsSource(props, jsc, sparkSession, 
null,
+            pubsubMessagesFetcher);
+
+    Pair<Option<Dataset<Row>>, String> expected = Pair.of(Option.empty(), "0");
+    Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = 
source.fetchNextBatch(Option.of("0"), 100);
+
+    assertEquals(expected, dataAndCheckpoint);
+  }
+
+  @Test
+  public void shouldReturnDataOnValidMessages() throws IOException {

Review Comment:
   let's remove the unused exceptions from all tests.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsIngestionConfig.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+/**
+ * Config keys and defaults for GCS Ingestion
+ */
+public class GcsIngestionConfig {
+
+  /**
+   * The GCP Project Id where the Pubsub Subscription to ingest from resides. 
Needed to connect
+   * to the Pubsub subscription
+   */
+  public static final String GOOGLE_PROJECT_ID = 
"hoodie.deltastreamer.source.gcs.project.id";
+
+  /**
+   * The GCP Pubsub subscription id for the GCS Notifications. Needed to 
connect to the Pubsub
+   * subscription.
+   */
+  public static final String PUBSUB_SUBSCRIPTION_ID = 
"hoodie.deltastreamer.source.gcs.subscription.id";
+
+  /**
+   * How many messages to pull from Cloud Pubsub at a time. Also see {@link 
DEFAULT_BATCH_SIZE}.
+   */
+  public static final String BATCH_SIZE_CONF = 
"hoodie.deltastreamer.source.gcs.batch.size";
+
+  /**
+   * Provide a reasonable setting for default batch size.
+   * If batch size is too big, two possible issues can happen:
+   * i) Acknowledgement takes too long (given that Hudi needs to commit 
first). That means Pubsub
+   * will keep delivering the same message since it wasn't acked in time.
+   * ii) The size of the request that acks outstanding messages may exceed the 
limit,
+   * which is 512KB as per Google's docs. See: 
https://cloud.google.com/pubsub/quotas#resource_limits
+   */
+  public static final int DEFAULT_BATCH_SIZE = 10;
+
+  // Size of inbound messages when pulling data, in bytes
+  public static final int DEFAULT_MAX_INBOUND_MESSAGE_SIZE = 20 * 1024 * 1024; 
// bytes
+
+  /**
+   * Whether to acknowledge messages or not. Not acknowledging means Pubsub 
will keep redelivering the
+   * same messages. In Prod this should always be true. So this is mainly 
useful during dev and testing.
+   */
+  public static final String ACK_MESSAGES = 
"hoodie.deltastreamer.source.gcs.ack";
+
+  /**
+   * Default value for {@link ACK_MESSAGES}
+   */
+  public static final boolean ACK_MESSAGES_DEFAULT_VALUE = true;
+
+  /**
+   * Check whether file exists before attempting to pull it
+   */
+  public static final String ENABLE_EXISTS_CHECK = 
"hoodie.deltastreamer.source.gcsincr.check.file.exists";

Review Comment:
   nit: change `gcsincr` to `gcs.incr`



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
+import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ENABLE_EXISTS_CHECK;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DATAFILE_FORMAT;
+
+/**
+ * An incremental source that detects new data in a source table containing 
metadata about GCS files,
+ * downloads the actual content of these files from GCS and stores them as 
records into a destination table.
+ * <p>
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options 
if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and 
Mysql is used for Hive Metastore).
+
+ 
absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
+
+ This class can be invoked via spark-submit as follows. There's a bunch of 
optional hive sync flags at the end.
+  $ bin/spark-submit \
+  --packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+  --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \
+  --driver-memory 4g \
+  --executor-memory 4g \
+  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
+  absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+  --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \
+  --op INSERT \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.file.format="parquet" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.select.file.extension="jsonl" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.datafile.format="json" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.select.relpath.prefix="country" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.ignore.relpath.prefix="blah" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.ignore.relpath.substring="blah" \
+  --hoodie-conf hoodie.datasource.write.recordkey.field=id \
+  --hoodie-conf hoodie.datasource.write.partitionpath.field= \
+  --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
 \
+  --filter-dupes \
+  --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+  --hoodie-conf hoodie.combine.before.insert=true \
+  --source-ordering-field id \
+  --table-type COPY_ON_WRITE \
+  --target-base-path file:\/\/\/absolute_path_to/data-gcs \
+  --target-table gcs_data \
+  --continuous \
+  --source-limit 100 \
+  --min-sync-interval-seconds 60 \
+  --hoodie-conf 
hoodie.deltastreamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs
 \
+  --hoodie-conf 
hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT
 \
+  --enable-hive-sync \
+  --hoodie-conf hoodie.datasource.hive_sync.database=default \
+  --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data \
+ */
+public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
+
+  private String srcPath;
+  private String fileFormat;

Review Comment:
   Looks like `fileFormat` is not being used. Either remove it or pass it to 
the fetchers.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MetadataMsg.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import com.google.pubsub.v1.PubsubMessage;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.DO_PROCESS;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.DO_SKIP;
+
+/**
+ * Wraps a PubsubMessage assuming it's from Cloud Storage Pubsub Notifications 
(CSPN), and
+ * adds relevant helper methods.
+ * For details of CSPN messages see: 
https://cloud.google.com/storage/docs/pubsub-notifications
+ */
+public class MetadataMsg {
+
+  // The CSPN message to wrap
+  private final PubsubMessage message;
+
+  private static final String EVENT_NAME_OBJECT_FINALIZE = "OBJECT_FINALIZE";
+
+  private static final String ATTR_EVENT_TYPE = "eventType";
+  private static final String ATTR_OBJECT_ID = "objectId";
+  private static final String ATTR_OVERWROTE_GENERATION = 
"overwroteGeneration";
+
+  public MetadataMsg(PubsubMessage message) {
+    this.message = message;
+  }
+
+  public String toStringUtf8() {

Review Comment:
   this can be static method if we pass the PubSub message as an argument.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import static com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create;
+import java.io.IOException;
+import java.util.List;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_MAX_INBOUND_MESSAGE_SIZE;
+
+/**
+ * Fetch messages from a specified Google Cloud Pubsub subscription.
+ */
+public class PubsubMessagesFetcher {

Review Comment:
   Why not subclass `CloudObjectsSelector`? I know it has certain S3 specific 
things but that can be refactored. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import static com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create;
+import java.io.IOException;
+import java.util.List;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_MAX_INBOUND_MESSAGE_SIZE;
+
+/**
+ * Fetch messages from a specified Google Cloud Pubsub subscription.
+ */
+public class PubsubMessagesFetcher {
+
+  private final String googleProjectId;
+  private final String pubsubSubscriptionId;
+
+  private final int batchSize;
+  private final SubscriberStubSettings subscriberStubSettings;
+
+  private final int maxInboundMessageSize;
+
+  private static final Logger LOG = 
LogManager.getLogger(PubsubMessagesFetcher.class);
+
+  public PubsubMessagesFetcher(String googleProjectId, String 
pubsubSubscriptionId, int batchSize) {
+    this.googleProjectId = googleProjectId;
+    this.pubsubSubscriptionId = pubsubSubscriptionId;
+    this.batchSize = batchSize;
+
+    maxInboundMessageSize = DEFAULT_MAX_INBOUND_MESSAGE_SIZE;
+
+    try {
+      /** For details of timeout and retry configs,
+       * see {@link 
com.google.cloud.pubsub.v1.stub.SubscriberStubSettings#initDefaults()},
+       * and the static code block in SubscriberStubSettings */
+      subscriberStubSettings =
+              SubscriberStubSettings.newBuilder()
+                      .setTransportChannelProvider(
+                              
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
+                                      
.setMaxInboundMessageSize(maxInboundMessageSize)
+                                      .build())
+                      .build();
+    } catch (IOException e) {
+      throw new HoodieException("Error creating subscriber stub settings", e);
+    }
+  }
+
+  public List<ReceivedMessage> fetchMessages() {

Review Comment:
   Does PubSub also have delete events?
   In general, does the two source handle deleting of objects from GCS? There 
were some caveats with S3. See 
https://github.com/apache/hudi/pull/3433#pullrequestreview-728946157 for more 
details.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/FilePathsFetcher.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.SELECT_RELATIVE_PATH_PREFIX;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.IGNORE_RELATIVE_PATH_PREFIX;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.IGNORE_RELATIVE_PATH_SUBSTR;
+
+/**
+ * Extracts a list of fully qualified GCS filepaths from a given Spark Dataset 
as input.
+ * Optionally:
+ * i) Match the filename and path against provided input filter strings
+ * ii) Check if each file exists on GCS, in which case it assumes SparkContext 
is already
+ * configured with GCS options through 
GcsEventsHoodieIncrSource.addGcsAccessConfs().
+ */
+public class FilePathsFetcher implements Serializable {
+
+  /**
+   * The default file format to assume if {@link 
GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION} is not given.
+   */
+  private final String fileFormat;
+  private final TypedProperties props;
+
+  private static final String GCS_PREFIX = "gs://";
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = 
LogManager.getLogger(FilePathsFetcher.class);
+
+  /**
+   * @param fileFormat The default file format to assume if {@link 
GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION}
+   *                   is not given.
+   */
+  public FilePathsFetcher(TypedProperties props, String fileFormat) {
+    this.props = props;
+    this.fileFormat = fileFormat;
+  }
+
+  /**
+   * @param sourceForFilenames a Dataset that contains metadata about files on 
GCS. Assumed to be a persisted form
+   *                           of a Cloud Storage Pubsub Notification event.
+   * @param checkIfExists      Check if each file exists, before returning its 
full path
+   * @return A list of fully qualifieed GCS file paths.

Review Comment:
   ```suggestion
      * @return A list of fully qualified GCS file paths.
   ```



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/FilePathsFetcher.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.SELECT_RELATIVE_PATH_PREFIX;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.IGNORE_RELATIVE_PATH_PREFIX;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.IGNORE_RELATIVE_PATH_SUBSTR;
+
+/**
+ * Extracts a list of fully qualified GCS filepaths from a given Spark Dataset 
as input.
+ * Optionally:
+ * i) Match the filename and path against provided input filter strings
+ * ii) Check if each file exists on GCS, in which case it assumes SparkContext 
is already
+ * configured with GCS options through 
GcsEventsHoodieIncrSource.addGcsAccessConfs().
+ */
+public class FilePathsFetcher implements Serializable {
+
+  /**
+   * The default file format to assume if {@link 
GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION} is not given.
+   */
+  private final String fileFormat;
+  private final TypedProperties props;
+
+  private static final String GCS_PREFIX = "gs://";
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = 
LogManager.getLogger(FilePathsFetcher.class);
+
+  /**
+   * @param fileFormat The default file format to assume if {@link 
GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION}
+   *                   is not given.
+   */
+  public FilePathsFetcher(TypedProperties props, String fileFormat) {
+    this.props = props;
+    this.fileFormat = fileFormat;
+  }
+
+  /**
+   * @param sourceForFilenames a Dataset that contains metadata about files on 
GCS. Assumed to be a persisted form
+   *                           of a Cloud Storage Pubsub Notification event.
+   * @param checkIfExists      Check if each file exists, before returning its 
full path
+   * @return A list of fully qualifieed GCS file paths.
+   */
+  public List<String> getGcsFilePaths(JavaSparkContext jsc, Dataset<Row> 
sourceForFilenames, boolean checkIfExists) {
+    String filter = createFilter();
+    LOG.info("Adding filter string to Dataset: " + filter);
+
+    SerializableConfiguration serializableConfiguration = new 
SerializableConfiguration(
+            jsc.hadoopConfiguration());
+
+    return sourceForFilenames
+            .filter(filter)
+            .select("bucket", "name")
+            .distinct()
+            .rdd().toJavaRDD().mapPartitions(
+                    getCloudFilesPerPartition(serializableConfiguration, 
checkIfExists)
+            ).collect();
+  }
+
+  private FlatMapFunction<Iterator<Row>, String> getCloudFilesPerPartition(
+          SerializableConfiguration serializableConfiguration, boolean 
checkIfExists) {
+
+    return rows -> {
+      List<String> cloudFilesPerPartition = new ArrayList<>();
+      rows.forEachRemaining(row -> {
+        addFileToList(row, cloudFilesPerPartition, serializableConfiguration, 
checkIfExists);
+      });
+
+      return cloudFilesPerPartition.iterator();
+    };
+  }
+
+  private void addFileToList(Row row, List<String> cloudFilesPerPartition,
+                             SerializableConfiguration 
serializableConfiguration, boolean checkIfExists) {
+    final Configuration configuration = serializableConfiguration.newCopy();
+
+    String bucket = row.getString(0);
+    String filePath = GCS_PREFIX + bucket + "/" + row.getString(1);
+
+    try {
+      addCloudFile(GCS_PREFIX, bucket, filePath, cloudFilesPerPartition, 
configuration, checkIfExists);
+    } catch (Exception exception) {
+      LOG.warn(String.format("Failed to add cloud file %s", filePath), 
exception);
+      throw new HoodieException(String.format("Failed to add cloud file %s", 
filePath), exception);
+    }
+  }
+
+  private void addCloudFile(String gcsPrefix, String bucket, String filePath,

Review Comment:
   Can we move these methods to some util class. The logic is similar to S3 and 
there is notthing specific to S3/GCS. The util method can be reused across 
S3/GCS.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
+import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ENABLE_EXISTS_CHECK;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DATAFILE_FORMAT;
+
+/**
+ * An incremental source that detects new data in a source table containing 
metadata about GCS files,
+ * downloads the actual content of these files from GCS and stores them as 
records into a destination table.
+ * <p>
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options 
if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and 
Mysql is used for Hive Metastore).
+
+ 
absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
+
+ This class can be invoked via spark-submit as follows. There's a bunch of 
optional hive sync flags at the end.
+  $ bin/spark-submit \
+  --packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+  --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \
+  --driver-memory 4g \
+  --executor-memory 4g \
+  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
+  absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+  --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \
+  --op INSERT \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.file.format="parquet" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.select.file.extension="jsonl" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.datafile.format="json" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.select.relpath.prefix="country" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.ignore.relpath.prefix="blah" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.ignore.relpath.substring="blah" \
+  --hoodie-conf hoodie.datasource.write.recordkey.field=id \
+  --hoodie-conf hoodie.datasource.write.partitionpath.field= \
+  --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
 \
+  --filter-dupes \
+  --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+  --hoodie-conf hoodie.combine.before.insert=true \
+  --source-ordering-field id \
+  --table-type COPY_ON_WRITE \
+  --target-base-path file:\/\/\/absolute_path_to/data-gcs \
+  --target-table gcs_data \
+  --continuous \
+  --source-limit 100 \
+  --min-sync-interval-seconds 60 \
+  --hoodie-conf 
hoodie.deltastreamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs
 \
+  --hoodie-conf 
hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT
 \
+  --enable-hive-sync \
+  --hoodie-conf hoodie.datasource.hive_sync.database=default \
+  --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data \
+ */
+public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
+
+  private String srcPath;
+  private String fileFormat;
+  private final boolean checkIfFileExists;
+  private int numInstantsPerFetch;
+
+  private MissingCheckpointStrategy missingCheckpointStrategy;
+  private final FilePathsFetcher filePathsFetcher;
+  private final FileDataFetcher fileDataFetcher;
+
+  private static final Logger LOG = 
LogManager.getLogger(GcsEventsHoodieIncrSource.class);
+
+  public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext 
jsc, SparkSession spark,

Review Comment:
   Do we need two constructors right now?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
+import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
+import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher;
+import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static 
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
+import static 
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.ENABLE_EXISTS_CHECK;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DATAFILE_FORMAT;
+
+/**
+ * An incremental source that detects new data in a source table containing 
metadata about GCS files,
+ * downloads the actual content of these files from GCS and stores them as 
records into a destination table.
+ * <p>
+ * You should set spark.driver.extraClassPath in spark-defaults.conf to
+ * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options 
if in cluster mode):
+ * (mysql-connector at the end is only needed if Hive Sync is enabled and 
Mysql is used for Hive Metastore).
+
+ 
absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar:
+ absolute_path_to/31.1-jre/guava-31.1-jre.jar:
+ absolute_path_to/mysql-connector-java-8.0.30.jar
+
+ This class can be invoked via spark-submit as follows. There's a bunch of 
optional hive sync flags at the end.
+  $ bin/spark-submit \
+  --packages com.google.cloud:google-cloud-pubsub:1.120.0 \
+  --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \
+  --driver-memory 4g \
+  --executor-memory 4g \
+  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
+  absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
+  --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \
+  --op INSERT \
+  --hoodie-conf hoodie.deltastreamer.source.hoodieincr.file.format="parquet" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.select.file.extension="jsonl" \
+  --hoodie-conf hoodie.deltastreamer.source.gcsincr.datafile.format="json" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.select.relpath.prefix="country" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.ignore.relpath.prefix="blah" \
+  --hoodie-conf 
hoodie.deltastreamer.source.gcsincr.ignore.relpath.substring="blah" \
+  --hoodie-conf hoodie.datasource.write.recordkey.field=id \
+  --hoodie-conf hoodie.datasource.write.partitionpath.field= \
+  --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
 \
+  --filter-dupes \
+  --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \
+  --hoodie-conf hoodie.combine.before.insert=true \
+  --source-ordering-field id \
+  --table-type COPY_ON_WRITE \
+  --target-base-path file:\/\/\/absolute_path_to/data-gcs \
+  --target-table gcs_data \
+  --continuous \
+  --source-limit 100 \
+  --min-sync-interval-seconds 60 \
+  --hoodie-conf 
hoodie.deltastreamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs
 \
+  --hoodie-conf 
hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT
 \
+  --enable-hive-sync \
+  --hoodie-conf hoodie.datasource.hive_sync.database=default \
+  --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data \
+ */
+public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
+
+  private String srcPath;
+  private String fileFormat;
+  private final boolean checkIfFileExists;
+  private int numInstantsPerFetch;
+
+  private MissingCheckpointStrategy missingCheckpointStrategy;
+  private final FilePathsFetcher filePathsFetcher;
+  private final FileDataFetcher fileDataFetcher;
+
+  private static final Logger LOG = 
LogManager.getLogger(GcsEventsHoodieIncrSource.class);
+
+  public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext 
jsc, SparkSession spark,
+                                   SchemaProvider schemaProvider) {
+
+    this(props, jsc, spark, schemaProvider,
+            new FilePathsFetcher(props, props.getString(SOURCE_FILE_FORMAT, 
DEFAULT_SOURCE_FILE_FORMAT)),
+            new FileDataFetcher(props, props.getString(DATAFILE_FORMAT, 
DEFAULT_SOURCE_FILE_FORMAT))
+    );
+  }
+
+  GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, 
SparkSession spark,
+                            SchemaProvider schemaProvider, FilePathsFetcher 
filePathsFetcher, FileDataFetcher fileDataFetcher) {
+    super(props, jsc, spark, schemaProvider);
+
+    DataSourceUtils.checkRequiredProperties(props, 
Collections.singletonList(HOODIE_SRC_BASE_PATH));
+    srcPath = props.getString(HOODIE_SRC_BASE_PATH);
+    fileFormat = props.getString(SOURCE_FILE_FORMAT, 
DEFAULT_SOURCE_FILE_FORMAT);
+    missingCheckpointStrategy = getMissingCheckpointStrategy(props);
+    numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH, 
DEFAULT_NUM_INSTANTS_PER_FETCH);
+    checkIfFileExists = props.getBoolean(ENABLE_EXISTS_CHECK, 
DEFAULT_ENABLE_EXISTS_CHECK);
+
+    this.filePathsFetcher = filePathsFetcher;
+    this.fileDataFetcher = fileDataFetcher;
+
+    addGcsAccessConfs(jsc);
+
+    LOG.info("srcPath: " + srcPath);
+    LOG.info("fileFormat: " + fileFormat);
+    LOG.info("missingCheckpointStrategy: " + missingCheckpointStrategy);
+    LOG.info("numInstantsPerFetch: " + numInstantsPerFetch);
+    LOG.info("checkIfFileExists: " + checkIfFileExists);
+  }
+
+  @Override
+  public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCkptStr, long sourceLimit) {
+    QueryInfo queryInfo = getQueryInfo(lastCkptStr);
+
+    if (queryInfo.areStartAndEndInstantsEqual()) {
+      LOG.info("Already caught up. Begin Checkpoint was: " + 
queryInfo.getStartInstant());
+      return Pair.of(Option.empty(), queryInfo.getStartInstant());
+    }
+
+    Dataset<Row> sourceForFilenames = 
queryInfo.initializeSourceForFilenames(srcPath, sparkSession);
+
+    if (sourceForFilenames.isEmpty()) {
+      LOG.info("Source of file names is empty. Returning empty result and 
endInstant: "
+              + queryInfo.getEndInstant());
+      return Pair.of(Option.empty(), queryInfo.getEndInstant());
+    }
+
+    return extractData(queryInfo, sourceForFilenames);
+  }
+
+  private Pair<Option<Dataset<Row>>, String> extractData(QueryInfo queryInfo, 
Dataset<Row> sourceForFilenames) {
+    List<String> filepaths = filePathsFetcher.getGcsFilePaths(sparkContext, 
sourceForFilenames, checkIfFileExists);
+
+    LOG.info("Extracted " + filepaths.size() + " distinct files."
+            + " Some samples " + 
filepaths.stream().limit(10).collect(Collectors.toList()));
+
+    Option<Dataset<Row>> fileDataDs = 
fileDataFetcher.fetchFileDataFromGcs(sparkSession, filepaths);
+    return Pair.of(fileDataDs, queryInfo.getEndInstant());
+  }
+
+  private QueryInfo getQueryInfo(Option<String> lastCkptStr) {
+    Option<String> beginInstant = getBeginInstant(lastCkptStr);
+
+    QueryInfo queryInfo = new QueryInfo(
+            calculateBeginAndEndInstants(
+                    sparkContext, srcPath, numInstantsPerFetch, beginInstant, 
missingCheckpointStrategy
+            )
+    );
+
+    queryInfo.logDetails();

Review Comment:
   Is it necessary? If it is then better to log in debug mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to