alexeykudinkin commented on code in PR #6386:
URL: https://github.com/apache/hudi/pull/6386#discussion_r946069343


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java:
##########
@@ -290,6 +292,7 @@ public OffsetRange[] getNextOffsetRanges(Option<String> 
lastCheckpointStr, long
       numEvents = sourceLimit;
     }
 
+    // TODO remove

Review Comment:
   Good call



##########
pom.xml:
##########
@@ -97,7 +97,9 @@
     <fasterxml.spark3.version>2.10.0</fasterxml.spark3.version>
     <kafka.version>2.0.0</kafka.version>
     <kafka.spark3.version>2.4.1</kafka.spark3.version>
-    <pulsar.version>2.8.1</pulsar.version>
+    <pulsar.version>2.10.1</pulsar.version>

Review Comment:
   Will be reverting this back, since client-dep turned out not to be required 
for connector to work (it actually breaks it if included)



##########
hudi-utilities/pom.xml:
##########
@@ -171,6 +171,14 @@
       </exclusions>
     </dependency>
 
+    <!-- Pulsar Spark Connector -->
+    <dependency>

Review Comment:
   This will have to be provided. We don't want to include this into our bundle 
(our bundles are neutral now, so we can't include it in "hudi-utilities-bundle")



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.HoodieConversionUtils;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.util.Lazy;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.pulsar.JsonUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.hudi.common.util.ThreadUtils.collectActiveThreads;
+
+/**
+ * Source fetching data from Pulsar topics
+ */
+public class PulsarSource extends RowSource implements Closeable {
+
+  private static final Logger LOG = LogManager.getLogger(PulsarSource.class);
+
+  private static final String HUDI_PULSAR_CONSUMER_ID_FORMAT = 
"hudi-pulsar-consumer-%d";
+  private static final String[] PULSAR_META_FIELDS = new String[]{
+      "__key",
+      "__topic",
+      "__messageId",
+      "__publishTime",
+      "__eventTime",
+      "__messageProperties"
+  };
+
+  private final String topicName;
+
+  private final String serviceEndpointURL;
+  private final String adminEndpointURL;
+
+  // NOTE: We're keeping the client so that we can shut it down properly
+  private final Lazy<PulsarClient> pulsarClient;
+  private final Lazy<Consumer<byte[]>> pulsarConsumer;
+
+  public PulsarSource(TypedProperties props,
+                      JavaSparkContext sparkContext,
+                      SparkSession sparkSession,
+                      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+
+    DataSourceUtils.checkRequiredProperties(props,
+        Arrays.asList(
+            Config.PULSAR_SOURCE_TOPIC_NAME.key(),
+            Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key()));
+
+    // Converting to a descriptor allows us to canonicalize the topic's name 
properly
+    this.topicName = 
TopicName.get(props.getString(Config.PULSAR_SOURCE_TOPIC_NAME.key())).toString();
+
+    // TODO validate endpoints provided in the appropriate format
+    this.serviceEndpointURL = 
props.getString(Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key());
+    this.adminEndpointURL = 
props.getString(Config.PULSAR_SOURCE_ADMIN_ENDPOINT_URL.key());
+
+    this.pulsarClient = Lazy.lazily(this::initPulsarClient);
+    this.pulsarConsumer = Lazy.lazily(this::subscribeToTopic);
+  }
+
+  @Override
+  protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCheckpointStr, long sourceLimit) {
+    Pair<MessageId, MessageId> startingEndingOffsetsPair = 
computeOffsets(lastCheckpointStr, sourceLimit);
+
+    MessageId startingOffset = startingEndingOffsetsPair.getLeft();
+    MessageId endingOffset = startingEndingOffsetsPair.getRight();
+
+    String startingOffsetStr = convertToOffsetString(topicName, 
startingOffset);
+    String endingOffsetStr = convertToOffsetString(topicName, endingOffset);
+
+    Dataset<Row> sourceRows = sparkSession.read()
+        .format("pulsar")
+        .option("service.url", serviceEndpointURL)
+        .option("admin.url", adminEndpointURL)
+        .option("topics", topicName)
+        .option("startingOffsets", startingOffsetStr)
+        .option("endingOffsets", endingOffsetStr)
+        .load();
+
+    return Pair.of(Option.of(transform(sourceRows)), endingOffsetStr);
+  }
+
+  @Override
+  public void onCommit(String lastCheckpointStr) {
+    MessageId latestConsumedOffset = 
JsonUtils.topicOffsets(lastCheckpointStr).apply(topicName);
+    ackOffset(latestConsumedOffset);
+  }
+
+  private Dataset<Row> transform(Dataset<Row> rows) {
+    return rows.drop(PULSAR_META_FIELDS);
+  }
+
+  private Pair<MessageId, MessageId> computeOffsets(Option<String> 
lastCheckpointStrOpt, long sourceLimit) {
+    MessageId startingOffset = decodeStartingOffset(lastCheckpointStrOpt);
+    MessageId endingOffset = fetchLatestOffset();
+
+    if (endingOffset.compareTo(startingOffset) < 0) {
+      String message = String.format("Ending offset (%s) is preceding starting 
offset (%s) for '%s'",
+          endingOffset, startingOffset, topicName);
+      throw new HoodieException(message);
+    }
+
+    // TODO support capping the amount of records fetched
+    Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props);
+
+    return Pair.of(startingOffset, endingOffset);
+  }
+
+  private MessageId decodeStartingOffset(Option<String> lastCheckpointStrOpt) {
+    return lastCheckpointStrOpt
+        .map(lastCheckpoint -> 
JsonUtils.topicOffsets(lastCheckpoint).apply(topicName))
+        .orElseGet(() -> {
+          Config.OffsetAutoResetStrategy autoResetStrategy = 
Config.OffsetAutoResetStrategy.valueOf(
+              
props.getString(Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.key(),
+                  
Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.defaultValue().name()));
+
+          switch (autoResetStrategy) {
+            case LATEST:
+              return fetchLatestOffset();
+            case EARLIEST:
+              return MessageId.earliest;
+            case FAIL:
+              throw new IllegalArgumentException("No checkpoint has been 
provided!");
+            default:
+              throw new UnsupportedOperationException("Unsupported offset 
auto-reset strategy");
+          }
+        });
+  }
+
+  private void ackOffset(MessageId latestConsumedOffset) {
+    try {
+      pulsarConsumer.get().acknowledgeCumulative(latestConsumedOffset);
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to ack messageId (%s) for topic '%s'", 
latestConsumedOffset, topicName), e);
+      throw new HoodieIOException("Failed to ack message for topic", e);
+    }
+  }
+
+  private MessageId fetchLatestOffset() {
+    try {
+      return pulsarConsumer.get().getLastMessageId();
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to fetch latest messageId for topic 
'%s'", topicName), e);
+      throw new HoodieIOException("Failed to fetch latest messageId for 
topic", e);
+    }
+  }
+
+  private Consumer<byte[]> subscribeToTopic() {
+    try {
+      // NOTE: We're generating unique subscription-id to make sure that 
subsequent invocation
+      //       of the DS, do not interfere w/ each other
+      String subscriptionId = String.format(HUDI_PULSAR_CONSUMER_ID_FORMAT, 
System.currentTimeMillis());
+      return pulsarClient.get()
+          .newConsumer()
+          .topic(topicName)
+          .subscriptionName(subscriptionId)
+          .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+          .subscriptionType(SubscriptionType.Exclusive)
+          .subscribe();
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to subscribe to Pulsar topic '%s'", 
topicName), e);
+      throw new HoodieIOException("Failed to subscribe to Pulsar topic", e);
+    }
+  }
+
+  private PulsarClient initPulsarClient() {
+    try {
+      return PulsarClient.builder()
+          .serviceUrl(serviceEndpointURL)
+          .build();
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to init Pulsar client connecting to 
'%s'", serviceEndpointURL), e);
+      throw new HoodieIOException("Failed to init Pulsar client", e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    shutdownPulsarClient(pulsarClient.get());
+  }
+
+  private static Long computeTargetRecordLimit(long sourceLimit, 
TypedProperties props) {
+    if (sourceLimit < Long.MAX_VALUE) {
+      return sourceLimit;
+    } else {
+      return 
props.getLong(Config.PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP.key(),
+          
Config.PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP.defaultValue());
+    }
+  }
+
+  private static String convertToOffsetString(String topic, MessageId 
startingOffset) {
+    return JsonUtils.topicOffsets(
+        HoodieConversionUtils.mapAsScalaImmutableMap(
+            Collections.singletonMap(topic, startingOffset)));
+  }
+
+  private static void shutdownPulsarClient(PulsarClient client) throws 
PulsarClientException {
+    client.close();
+    // NOTE: Current version of Pulsar's client (in Pulsar Spark Connector 
3.1.1.4) is not
+    //       shutting down event-loop group properly, so we had to shut it 
down manually
+    try {
+      EventLoopGroup eventLoopGroup = ((PulsarClientImpl) 
client).eventLoopGroup();
+      if (eventLoopGroup != null) {
+        eventLoopGroup.shutdownGracefully().await();
+      }
+    } catch (InterruptedException e) {
+      // No-op
+    }
+
+    // NOTE: Pulsar clients initialized by the spark-connector, might be left 
not shutdown
+    //       properly (see above). To work this around we employ "nuclear" 
option of
+    //       fetching all Pulsar client threads and interrupting them forcibly 
(to make them
+    //       shutdown)
+    collectActiveThreads().stream().sequential()
+        .filter(t -> t.getName().startsWith("pulsar-client-io"))
+        .forEach(Thread::interrupt);
+  }
+
+  public static class Config {
+    private static final ConfigProperty<String> PULSAR_SOURCE_TOPIC_NAME = 
ConfigProperty
+        .key("hoodie.deltastreamer.source.pulsar.topic")
+        .noDefaultValue()
+        .withDocumentation("Name of the target Pulsar topic to source data 
from");
+
+    private static final ConfigProperty<String> 
PULSAR_SOURCE_SERVICE_ENDPOINT_URL = ConfigProperty
+        .key("hoodie.deltastreamer.source.pulsar.endpoint.service.url")
+        .defaultValue("pulsar://localhost:6650")
+        .withDocumentation("URL of the target Pulsar endpoint (of the form 
'pulsar://host:port'");
+
+    private static final ConfigProperty<String> 
PULSAR_SOURCE_ADMIN_ENDPOINT_URL = ConfigProperty
+        .key("hoodie.deltastreamer.source.pulsar.endpoint.admin.url")
+        .defaultValue("http://localhost:8080";)
+        .withDocumentation("URL of the target Pulsar endpoint (of the form 
'pulsar://host:port'");
+
+    public enum OffsetAutoResetStrategy {
+      LATEST, EARLIEST, FAIL
+    }
+
+    private static final ConfigProperty<OffsetAutoResetStrategy> 
PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY = ConfigProperty
+        .key("hoodie.deltastreamer.source.pulsar.offset.autoResetStrategy")

Review Comment:
   I thought about unifying the configs, but this is a much bigger project (if 
any) to unify configs across sources (where it makes sense).
   
   For now i'd rather avoid coupling the sources, unless there's a very clear 
benefit of doing so. 
   
   P.S. `auto.offset.reset` doesn't even follow our own naming convention for 
ex, b/c it's a Kafka config that we (unfortunately) share



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.HoodieConversionUtils;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.util.Lazy;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.pulsar.JsonUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.hudi.common.util.ThreadUtils.collectActiveThreads;
+
+/**
+ * Source fetching data from Pulsar topics
+ */
+public class PulsarSource extends RowSource implements Closeable {
+
+  private static final Logger LOG = LogManager.getLogger(PulsarSource.class);
+
+  private static final String HUDI_PULSAR_CONSUMER_ID_FORMAT = 
"hudi-pulsar-consumer-%d";
+  private static final String[] PULSAR_META_FIELDS = new String[]{
+      "__key",
+      "__topic",
+      "__messageId",
+      "__publishTime",
+      "__eventTime",
+      "__messageProperties"
+  };
+
+  private final String topicName;
+
+  private final String serviceEndpointURL;
+  private final String adminEndpointURL;
+
+  // NOTE: We're keeping the client so that we can shut it down properly
+  private final Lazy<PulsarClient> pulsarClient;
+  private final Lazy<Consumer<byte[]>> pulsarConsumer;
+
+  public PulsarSource(TypedProperties props,
+                      JavaSparkContext sparkContext,
+                      SparkSession sparkSession,
+                      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+
+    DataSourceUtils.checkRequiredProperties(props,
+        Arrays.asList(
+            Config.PULSAR_SOURCE_TOPIC_NAME.key(),
+            Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key()));
+
+    // Converting to a descriptor allows us to canonicalize the topic's name 
properly
+    this.topicName = 
TopicName.get(props.getString(Config.PULSAR_SOURCE_TOPIC_NAME.key())).toString();
+
+    // TODO validate endpoints provided in the appropriate format
+    this.serviceEndpointURL = 
props.getString(Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key());
+    this.adminEndpointURL = 
props.getString(Config.PULSAR_SOURCE_ADMIN_ENDPOINT_URL.key());
+
+    this.pulsarClient = Lazy.lazily(this::initPulsarClient);
+    this.pulsarConsumer = Lazy.lazily(this::subscribeToTopic);
+  }
+
+  @Override
+  protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCheckpointStr, long sourceLimit) {
+    Pair<MessageId, MessageId> startingEndingOffsetsPair = 
computeOffsets(lastCheckpointStr, sourceLimit);
+
+    MessageId startingOffset = startingEndingOffsetsPair.getLeft();
+    MessageId endingOffset = startingEndingOffsetsPair.getRight();
+
+    String startingOffsetStr = convertToOffsetString(topicName, 
startingOffset);
+    String endingOffsetStr = convertToOffsetString(topicName, endingOffset);
+
+    Dataset<Row> sourceRows = sparkSession.read()
+        .format("pulsar")
+        .option("service.url", serviceEndpointURL)
+        .option("admin.url", adminEndpointURL)
+        .option("topics", topicName)
+        .option("startingOffsets", startingOffsetStr)
+        .option("endingOffsets", endingOffsetStr)
+        .load();
+
+    return Pair.of(Option.of(transform(sourceRows)), endingOffsetStr);
+  }
+
+  @Override
+  public void onCommit(String lastCheckpointStr) {
+    MessageId latestConsumedOffset = 
JsonUtils.topicOffsets(lastCheckpointStr).apply(topicName);
+    ackOffset(latestConsumedOffset);
+  }
+
+  private Dataset<Row> transform(Dataset<Row> rows) {
+    return rows.drop(PULSAR_META_FIELDS);
+  }
+
+  private Pair<MessageId, MessageId> computeOffsets(Option<String> 
lastCheckpointStrOpt, long sourceLimit) {
+    MessageId startingOffset = decodeStartingOffset(lastCheckpointStrOpt);
+    MessageId endingOffset = fetchLatestOffset();
+
+    if (endingOffset.compareTo(startingOffset) < 0) {
+      String message = String.format("Ending offset (%s) is preceding starting 
offset (%s) for '%s'",
+          endingOffset, startingOffset, topicName);
+      throw new HoodieException(message);
+    }
+
+    // TODO support capping the amount of records fetched
+    Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props);
+
+    return Pair.of(startingOffset, endingOffset);
+  }
+
+  private MessageId decodeStartingOffset(Option<String> lastCheckpointStrOpt) {
+    return lastCheckpointStrOpt
+        .map(lastCheckpoint -> 
JsonUtils.topicOffsets(lastCheckpoint).apply(topicName))
+        .orElseGet(() -> {
+          Config.OffsetAutoResetStrategy autoResetStrategy = 
Config.OffsetAutoResetStrategy.valueOf(
+              
props.getString(Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.key(),
+                  
Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.defaultValue().name()));
+
+          switch (autoResetStrategy) {
+            case LATEST:
+              return fetchLatestOffset();
+            case EARLIEST:
+              return MessageId.earliest;
+            case FAIL:
+              throw new IllegalArgumentException("No checkpoint has been 
provided!");
+            default:
+              throw new UnsupportedOperationException("Unsupported offset 
auto-reset strategy");
+          }
+        });
+  }
+
+  private void ackOffset(MessageId latestConsumedOffset) {
+    try {
+      pulsarConsumer.get().acknowledgeCumulative(latestConsumedOffset);
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to ack messageId (%s) for topic '%s'", 
latestConsumedOffset, topicName), e);
+      throw new HoodieIOException("Failed to ack message for topic", e);
+    }
+  }
+
+  private MessageId fetchLatestOffset() {
+    try {
+      return pulsarConsumer.get().getLastMessageId();
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to fetch latest messageId for topic 
'%s'", topicName), e);
+      throw new HoodieIOException("Failed to fetch latest messageId for 
topic", e);
+    }
+  }
+
+  private Consumer<byte[]> subscribeToTopic() {
+    try {
+      // NOTE: We're generating unique subscription-id to make sure that 
subsequent invocation
+      //       of the DS, do not interfere w/ each other
+      String subscriptionId = String.format(HUDI_PULSAR_CONSUMER_ID_FORMAT, 
System.currentTimeMillis());
+      return pulsarClient.get()
+          .newConsumer()
+          .topic(topicName)
+          .subscriptionName(subscriptionId)
+          .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+          .subscriptionType(SubscriptionType.Exclusive)
+          .subscribe();
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to subscribe to Pulsar topic '%s'", 
topicName), e);
+      throw new HoodieIOException("Failed to subscribe to Pulsar topic", e);
+    }
+  }
+
+  private PulsarClient initPulsarClient() {
+    try {
+      return PulsarClient.builder()
+          .serviceUrl(serviceEndpointURL)
+          .build();
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to init Pulsar client connecting to 
'%s'", serviceEndpointURL), e);
+      throw new HoodieIOException("Failed to init Pulsar client", e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    shutdownPulsarClient(pulsarClient.get());
+  }
+
+  private static Long computeTargetRecordLimit(long sourceLimit, 
TypedProperties props) {
+    if (sourceLimit < Long.MAX_VALUE) {
+      return sourceLimit;
+    } else {
+      return 
props.getLong(Config.PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP.key(),
+          
Config.PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP.defaultValue());
+    }
+  }
+
+  private static String convertToOffsetString(String topic, MessageId 
startingOffset) {
+    return JsonUtils.topicOffsets(
+        HoodieConversionUtils.mapAsScalaImmutableMap(
+            Collections.singletonMap(topic, startingOffset)));
+  }
+
+  private static void shutdownPulsarClient(PulsarClient client) throws 
PulsarClientException {
+    client.close();
+    // NOTE: Current version of Pulsar's client (in Pulsar Spark Connector 
3.1.1.4) is not
+    //       shutting down event-loop group properly, so we had to shut it 
down manually
+    try {
+      EventLoopGroup eventLoopGroup = ((PulsarClientImpl) 
client).eventLoopGroup();
+      if (eventLoopGroup != null) {
+        eventLoopGroup.shutdownGracefully().await();
+      }
+    } catch (InterruptedException e) {
+      // No-op
+    }
+
+    // NOTE: Pulsar clients initialized by the spark-connector, might be left 
not shutdown
+    //       properly (see above). To work this around we employ "nuclear" 
option of
+    //       fetching all Pulsar client threads and interrupting them forcibly 
(to make them
+    //       shutdown)
+    collectActiveThreads().stream().sequential()
+        .filter(t -> t.getName().startsWith("pulsar-client-io"))
+        .forEach(Thread::interrupt);
+  }
+
+  public static class Config {
+    private static final ConfigProperty<String> PULSAR_SOURCE_TOPIC_NAME = 
ConfigProperty
+        .key("hoodie.deltastreamer.source.pulsar.topic")
+        .noDefaultValue()
+        .withDocumentation("Name of the target Pulsar topic to source data 
from");
+
+    private static final ConfigProperty<String> 
PULSAR_SOURCE_SERVICE_ENDPOINT_URL = ConfigProperty
+        .key("hoodie.deltastreamer.source.pulsar.endpoint.service.url")
+        .defaultValue("pulsar://localhost:6650")
+        .withDocumentation("URL of the target Pulsar endpoint (of the form 
'pulsar://host:port'");
+
+    private static final ConfigProperty<String> 
PULSAR_SOURCE_ADMIN_ENDPOINT_URL = ConfigProperty
+        .key("hoodie.deltastreamer.source.pulsar.endpoint.admin.url")
+        .defaultValue("http://localhost:8080";)
+        .withDocumentation("URL of the target Pulsar endpoint (of the form 
'pulsar://host:port'");
+
+    public enum OffsetAutoResetStrategy {
+      LATEST, EARLIEST, FAIL
+    }
+
+    private static final ConfigProperty<OffsetAutoResetStrategy> 
PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY = ConfigProperty
+        .key("hoodie.deltastreamer.source.pulsar.offset.autoResetStrategy")
+        .defaultValue(OffsetAutoResetStrategy.LATEST)
+        .withDocumentation("Policy determining how offsets shall be 
automatically reset in case there's "
+            + "no checkpoint information present");

Review Comment:
   Not sure i understand your point: how's it different from the current 
wording?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.HoodieConversionUtils;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.util.Lazy;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.pulsar.JsonUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.hudi.common.util.ThreadUtils.collectActiveThreads;
+
+/**
+ * Source fetching data from Pulsar topics
+ */
+public class PulsarSource extends RowSource implements Closeable {
+
+  private static final Logger LOG = LogManager.getLogger(PulsarSource.class);
+
+  private static final String HUDI_PULSAR_CONSUMER_ID_FORMAT = 
"hudi-pulsar-consumer-%d";
+  private static final String[] PULSAR_META_FIELDS = new String[]{
+      "__key",
+      "__topic",
+      "__messageId",
+      "__publishTime",
+      "__eventTime",
+      "__messageProperties"
+  };
+
+  private final String topicName;
+
+  private final String serviceEndpointURL;
+  private final String adminEndpointURL;
+
+  // NOTE: We're keeping the client so that we can shut it down properly
+  private final Lazy<PulsarClient> pulsarClient;
+  private final Lazy<Consumer<byte[]>> pulsarConsumer;
+
+  public PulsarSource(TypedProperties props,
+                      JavaSparkContext sparkContext,
+                      SparkSession sparkSession,
+                      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+
+    DataSourceUtils.checkRequiredProperties(props,
+        Arrays.asList(
+            Config.PULSAR_SOURCE_TOPIC_NAME.key(),
+            Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key()));
+
+    // Converting to a descriptor allows us to canonicalize the topic's name 
properly
+    this.topicName = 
TopicName.get(props.getString(Config.PULSAR_SOURCE_TOPIC_NAME.key())).toString();
+
+    // TODO validate endpoints provided in the appropriate format
+    this.serviceEndpointURL = 
props.getString(Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key());
+    this.adminEndpointURL = 
props.getString(Config.PULSAR_SOURCE_ADMIN_ENDPOINT_URL.key());
+
+    this.pulsarClient = Lazy.lazily(this::initPulsarClient);
+    this.pulsarConsumer = Lazy.lazily(this::subscribeToTopic);
+  }
+
+  @Override
+  protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCheckpointStr, long sourceLimit) {
+    Pair<MessageId, MessageId> startingEndingOffsetsPair = 
computeOffsets(lastCheckpointStr, sourceLimit);
+
+    MessageId startingOffset = startingEndingOffsetsPair.getLeft();
+    MessageId endingOffset = startingEndingOffsetsPair.getRight();
+
+    String startingOffsetStr = convertToOffsetString(topicName, 
startingOffset);
+    String endingOffsetStr = convertToOffsetString(topicName, endingOffset);
+
+    Dataset<Row> sourceRows = sparkSession.read()
+        .format("pulsar")
+        .option("service.url", serviceEndpointURL)
+        .option("admin.url", adminEndpointURL)
+        .option("topics", topicName)
+        .option("startingOffsets", startingOffsetStr)
+        .option("endingOffsets", endingOffsetStr)
+        .load();
+
+    return Pair.of(Option.of(transform(sourceRows)), endingOffsetStr);
+  }
+
+  @Override
+  public void onCommit(String lastCheckpointStr) {
+    MessageId latestConsumedOffset = 
JsonUtils.topicOffsets(lastCheckpointStr).apply(topicName);
+    ackOffset(latestConsumedOffset);
+  }
+
+  private Dataset<Row> transform(Dataset<Row> rows) {
+    return rows.drop(PULSAR_META_FIELDS);
+  }
+
+  private Pair<MessageId, MessageId> computeOffsets(Option<String> 
lastCheckpointStrOpt, long sourceLimit) {
+    MessageId startingOffset = decodeStartingOffset(lastCheckpointStrOpt);
+    MessageId endingOffset = fetchLatestOffset();
+
+    if (endingOffset.compareTo(startingOffset) < 0) {
+      String message = String.format("Ending offset (%s) is preceding starting 
offset (%s) for '%s'",
+          endingOffset, startingOffset, topicName);
+      throw new HoodieException(message);
+    }
+
+    // TODO support capping the amount of records fetched
+    Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props);
+
+    return Pair.of(startingOffset, endingOffset);
+  }
+
+  private MessageId decodeStartingOffset(Option<String> lastCheckpointStrOpt) {
+    return lastCheckpointStrOpt
+        .map(lastCheckpoint -> 
JsonUtils.topicOffsets(lastCheckpoint).apply(topicName))
+        .orElseGet(() -> {
+          Config.OffsetAutoResetStrategy autoResetStrategy = 
Config.OffsetAutoResetStrategy.valueOf(
+              
props.getString(Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.key(),
+                  
Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.defaultValue().name()));
+
+          switch (autoResetStrategy) {
+            case LATEST:
+              return fetchLatestOffset();
+            case EARLIEST:
+              return MessageId.earliest;
+            case FAIL:

Review Comment:
   The goal is, if we don't have any checkpoint info (for whatever reason) we 
fail until checkpoint is provided explicitly by the user for ex (to avoid 
either a data loss, or extensive re-processing)



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.HoodieConversionUtils;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.util.Lazy;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.pulsar.JsonUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.hudi.common.util.ThreadUtils.collectActiveThreads;
+
+/**
+ * Source fetching data from Pulsar topics
+ */
+public class PulsarSource extends RowSource implements Closeable {
+
+  private static final Logger LOG = LogManager.getLogger(PulsarSource.class);
+
+  private static final String HUDI_PULSAR_CONSUMER_ID_FORMAT = 
"hudi-pulsar-consumer-%d";
+  private static final String[] PULSAR_META_FIELDS = new String[]{
+      "__key",
+      "__topic",
+      "__messageId",
+      "__publishTime",
+      "__eventTime",
+      "__messageProperties"
+  };
+
+  private final String topicName;
+
+  private final String serviceEndpointURL;
+  private final String adminEndpointURL;
+
+  // NOTE: We're keeping the client so that we can shut it down properly
+  private final Lazy<PulsarClient> pulsarClient;
+  private final Lazy<Consumer<byte[]>> pulsarConsumer;
+
+  public PulsarSource(TypedProperties props,
+                      JavaSparkContext sparkContext,
+                      SparkSession sparkSession,
+                      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+
+    DataSourceUtils.checkRequiredProperties(props,
+        Arrays.asList(
+            Config.PULSAR_SOURCE_TOPIC_NAME.key(),
+            Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key()));
+
+    // Converting to a descriptor allows us to canonicalize the topic's name 
properly
+    this.topicName = 
TopicName.get(props.getString(Config.PULSAR_SOURCE_TOPIC_NAME.key())).toString();
+
+    // TODO validate endpoints provided in the appropriate format
+    this.serviceEndpointURL = 
props.getString(Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key());
+    this.adminEndpointURL = 
props.getString(Config.PULSAR_SOURCE_ADMIN_ENDPOINT_URL.key());
+
+    this.pulsarClient = Lazy.lazily(this::initPulsarClient);
+    this.pulsarConsumer = Lazy.lazily(this::subscribeToTopic);
+  }
+
+  @Override
+  protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCheckpointStr, long sourceLimit) {
+    Pair<MessageId, MessageId> startingEndingOffsetsPair = 
computeOffsets(lastCheckpointStr, sourceLimit);
+
+    MessageId startingOffset = startingEndingOffsetsPair.getLeft();
+    MessageId endingOffset = startingEndingOffsetsPair.getRight();
+
+    String startingOffsetStr = convertToOffsetString(topicName, 
startingOffset);
+    String endingOffsetStr = convertToOffsetString(topicName, endingOffset);
+
+    Dataset<Row> sourceRows = sparkSession.read()
+        .format("pulsar")
+        .option("service.url", serviceEndpointURL)
+        .option("admin.url", adminEndpointURL)
+        .option("topics", topicName)
+        .option("startingOffsets", startingOffsetStr)
+        .option("endingOffsets", endingOffsetStr)
+        .load();
+
+    return Pair.of(Option.of(transform(sourceRows)), endingOffsetStr);
+  }
+
+  @Override
+  public void onCommit(String lastCheckpointStr) {
+    MessageId latestConsumedOffset = 
JsonUtils.topicOffsets(lastCheckpointStr).apply(topicName);
+    ackOffset(latestConsumedOffset);
+  }
+
+  private Dataset<Row> transform(Dataset<Row> rows) {
+    return rows.drop(PULSAR_META_FIELDS);
+  }
+
+  private Pair<MessageId, MessageId> computeOffsets(Option<String> 
lastCheckpointStrOpt, long sourceLimit) {
+    MessageId startingOffset = decodeStartingOffset(lastCheckpointStrOpt);
+    MessageId endingOffset = fetchLatestOffset();
+
+    if (endingOffset.compareTo(startingOffset) < 0) {
+      String message = String.format("Ending offset (%s) is preceding starting 
offset (%s) for '%s'",
+          endingOffset, startingOffset, topicName);
+      throw new HoodieException(message);
+    }
+
+    // TODO support capping the amount of records fetched
+    Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props);
+
+    return Pair.of(startingOffset, endingOffset);
+  }
+
+  private MessageId decodeStartingOffset(Option<String> lastCheckpointStrOpt) {
+    return lastCheckpointStrOpt
+        .map(lastCheckpoint -> 
JsonUtils.topicOffsets(lastCheckpoint).apply(topicName))

Review Comment:
   If it's not available, it'd be empty (Option)



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.HoodieConversionUtils;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.util.Lazy;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.pulsar.JsonUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.hudi.common.util.ThreadUtils.collectActiveThreads;
+
+/**
+ * Source fetching data from Pulsar topics
+ */
+public class PulsarSource extends RowSource implements Closeable {
+
+  private static final Logger LOG = LogManager.getLogger(PulsarSource.class);
+
+  private static final String HUDI_PULSAR_CONSUMER_ID_FORMAT = 
"hudi-pulsar-consumer-%d";
+  private static final String[] PULSAR_META_FIELDS = new String[]{
+      "__key",
+      "__topic",
+      "__messageId",
+      "__publishTime",
+      "__eventTime",
+      "__messageProperties"
+  };
+
+  private final String topicName;
+
+  private final String serviceEndpointURL;
+  private final String adminEndpointURL;
+
+  // NOTE: We're keeping the client so that we can shut it down properly
+  private final Lazy<PulsarClient> pulsarClient;
+  private final Lazy<Consumer<byte[]>> pulsarConsumer;
+
+  public PulsarSource(TypedProperties props,
+                      JavaSparkContext sparkContext,
+                      SparkSession sparkSession,
+                      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+
+    DataSourceUtils.checkRequiredProperties(props,
+        Arrays.asList(
+            Config.PULSAR_SOURCE_TOPIC_NAME.key(),
+            Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key()));
+
+    // Converting to a descriptor allows us to canonicalize the topic's name 
properly
+    this.topicName = 
TopicName.get(props.getString(Config.PULSAR_SOURCE_TOPIC_NAME.key())).toString();
+
+    // TODO validate endpoints provided in the appropriate format
+    this.serviceEndpointURL = 
props.getString(Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key());
+    this.adminEndpointURL = 
props.getString(Config.PULSAR_SOURCE_ADMIN_ENDPOINT_URL.key());
+
+    this.pulsarClient = Lazy.lazily(this::initPulsarClient);
+    this.pulsarConsumer = Lazy.lazily(this::subscribeToTopic);
+  }
+
+  @Override
+  protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCheckpointStr, long sourceLimit) {
+    Pair<MessageId, MessageId> startingEndingOffsetsPair = 
computeOffsets(lastCheckpointStr, sourceLimit);
+
+    MessageId startingOffset = startingEndingOffsetsPair.getLeft();
+    MessageId endingOffset = startingEndingOffsetsPair.getRight();
+
+    String startingOffsetStr = convertToOffsetString(topicName, 
startingOffset);
+    String endingOffsetStr = convertToOffsetString(topicName, endingOffset);
+
+    Dataset<Row> sourceRows = sparkSession.read()
+        .format("pulsar")
+        .option("service.url", serviceEndpointURL)
+        .option("admin.url", adminEndpointURL)
+        .option("topics", topicName)
+        .option("startingOffsets", startingOffsetStr)
+        .option("endingOffsets", endingOffsetStr)
+        .load();
+
+    return Pair.of(Option.of(transform(sourceRows)), endingOffsetStr);
+  }
+
+  @Override
+  public void onCommit(String lastCheckpointStr) {
+    MessageId latestConsumedOffset = 
JsonUtils.topicOffsets(lastCheckpointStr).apply(topicName);
+    ackOffset(latestConsumedOffset);
+  }
+
+  private Dataset<Row> transform(Dataset<Row> rows) {
+    return rows.drop(PULSAR_META_FIELDS);
+  }
+
+  private Pair<MessageId, MessageId> computeOffsets(Option<String> 
lastCheckpointStrOpt, long sourceLimit) {
+    MessageId startingOffset = decodeStartingOffset(lastCheckpointStrOpt);
+    MessageId endingOffset = fetchLatestOffset();
+
+    if (endingOffset.compareTo(startingOffset) < 0) {
+      String message = String.format("Ending offset (%s) is preceding starting 
offset (%s) for '%s'",
+          endingOffset, startingOffset, topicName);
+      throw new HoodieException(message);
+    }
+
+    // TODO support capping the amount of records fetched
+    Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props);
+
+    return Pair.of(startingOffset, endingOffset);
+  }
+
+  private MessageId decodeStartingOffset(Option<String> lastCheckpointStrOpt) {
+    return lastCheckpointStrOpt
+        .map(lastCheckpoint -> 
JsonUtils.topicOffsets(lastCheckpoint).apply(topicName))
+        .orElseGet(() -> {
+          Config.OffsetAutoResetStrategy autoResetStrategy = 
Config.OffsetAutoResetStrategy.valueOf(
+              
props.getString(Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.key(),
+                  
Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.defaultValue().name()));
+
+          switch (autoResetStrategy) {
+            case LATEST:
+              return fetchLatestOffset();
+            case EARLIEST:
+              return MessageId.earliest;

Review Comment:
   `earliest` is a shortcut for this



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -123,4 +127,15 @@ public InputBatch<Dataset<Row>> 
fetchNewDataInRowFormat(Option<String> lastCkptS
   public Source getSource() {
     return source;
   }
+
+  @Override
+  public void close() {
+    if (source instanceof Closeable) {

Review Comment:
   Good call. Created HUDI-4624



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.HoodieConversionUtils;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.util.Lazy;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.pulsar.JsonUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.hudi.common.util.ThreadUtils.collectActiveThreads;
+
+/**
+ * Source fetching data from Pulsar topics
+ */
+public class PulsarSource extends RowSource implements Closeable {
+
+  private static final Logger LOG = LogManager.getLogger(PulsarSource.class);
+
+  private static final String HUDI_PULSAR_CONSUMER_ID_FORMAT = 
"hudi-pulsar-consumer-%d";
+  private static final String[] PULSAR_META_FIELDS = new String[]{
+      "__key",
+      "__topic",
+      "__messageId",
+      "__publishTime",
+      "__eventTime",
+      "__messageProperties"
+  };
+
+  private final String topicName;
+
+  private final String serviceEndpointURL;
+  private final String adminEndpointURL;
+
+  // NOTE: We're keeping the client so that we can shut it down properly
+  private final Lazy<PulsarClient> pulsarClient;
+  private final Lazy<Consumer<byte[]>> pulsarConsumer;
+
+  public PulsarSource(TypedProperties props,
+                      JavaSparkContext sparkContext,
+                      SparkSession sparkSession,
+                      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+
+    DataSourceUtils.checkRequiredProperties(props,
+        Arrays.asList(
+            Config.PULSAR_SOURCE_TOPIC_NAME.key(),
+            Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key()));
+
+    // Converting to a descriptor allows us to canonicalize the topic's name 
properly
+    this.topicName = 
TopicName.get(props.getString(Config.PULSAR_SOURCE_TOPIC_NAME.key())).toString();
+
+    // TODO validate endpoints provided in the appropriate format
+    this.serviceEndpointURL = 
props.getString(Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key());
+    this.adminEndpointURL = 
props.getString(Config.PULSAR_SOURCE_ADMIN_ENDPOINT_URL.key());
+
+    this.pulsarClient = Lazy.lazily(this::initPulsarClient);
+    this.pulsarConsumer = Lazy.lazily(this::subscribeToTopic);
+  }
+
+  @Override
+  protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCheckpointStr, long sourceLimit) {
+    Pair<MessageId, MessageId> startingEndingOffsetsPair = 
computeOffsets(lastCheckpointStr, sourceLimit);
+
+    MessageId startingOffset = startingEndingOffsetsPair.getLeft();
+    MessageId endingOffset = startingEndingOffsetsPair.getRight();
+
+    String startingOffsetStr = convertToOffsetString(topicName, 
startingOffset);
+    String endingOffsetStr = convertToOffsetString(topicName, endingOffset);
+
+    Dataset<Row> sourceRows = sparkSession.read()
+        .format("pulsar")
+        .option("service.url", serviceEndpointURL)
+        .option("admin.url", adminEndpointURL)
+        .option("topics", topicName)
+        .option("startingOffsets", startingOffsetStr)
+        .option("endingOffsets", endingOffsetStr)
+        .load();
+
+    return Pair.of(Option.of(transform(sourceRows)), endingOffsetStr);
+  }
+
+  @Override
+  public void onCommit(String lastCheckpointStr) {
+    MessageId latestConsumedOffset = 
JsonUtils.topicOffsets(lastCheckpointStr).apply(topicName);
+    ackOffset(latestConsumedOffset);
+  }
+
+  private Dataset<Row> transform(Dataset<Row> rows) {
+    return rows.drop(PULSAR_META_FIELDS);
+  }
+
+  private Pair<MessageId, MessageId> computeOffsets(Option<String> 
lastCheckpointStrOpt, long sourceLimit) {
+    MessageId startingOffset = decodeStartingOffset(lastCheckpointStrOpt);
+    MessageId endingOffset = fetchLatestOffset();
+
+    if (endingOffset.compareTo(startingOffset) < 0) {
+      String message = String.format("Ending offset (%s) is preceding starting 
offset (%s) for '%s'",
+          endingOffset, startingOffset, topicName);
+      throw new HoodieException(message);
+    }
+
+    // TODO support capping the amount of records fetched

Review Comment:
   > isn't this as simple as setting endingOffset = Math.min(endingOffset, 
startingOffset + maxRecordsLimit);
   
   In theory -- yes, in practice -- no. Pulsar uses opaque offsets 
(`MessageId`) which are not straightforward to shift. I poked their Slack to 
see what we can do about it, but neither their docs nor reading their code 
helped me find the use-case similar to ours where they are shifting the offset 
by some constant factor.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.HoodieConversionUtils;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.util.Lazy;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.pulsar.JsonUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.hudi.common.util.ThreadUtils.collectActiveThreads;
+
+/**
+ * Source fetching data from Pulsar topics
+ */
+public class PulsarSource extends RowSource implements Closeable {
+
+  private static final Logger LOG = LogManager.getLogger(PulsarSource.class);
+
+  private static final String HUDI_PULSAR_CONSUMER_ID_FORMAT = 
"hudi-pulsar-consumer-%d";
+  private static final String[] PULSAR_META_FIELDS = new String[]{
+      "__key",
+      "__topic",
+      "__messageId",
+      "__publishTime",
+      "__eventTime",
+      "__messageProperties"
+  };
+
+  private final String topicName;
+
+  private final String serviceEndpointURL;
+  private final String adminEndpointURL;
+
+  // NOTE: We're keeping the client so that we can shut it down properly
+  private final Lazy<PulsarClient> pulsarClient;
+  private final Lazy<Consumer<byte[]>> pulsarConsumer;
+
+  public PulsarSource(TypedProperties props,
+                      JavaSparkContext sparkContext,
+                      SparkSession sparkSession,
+                      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+
+    DataSourceUtils.checkRequiredProperties(props,
+        Arrays.asList(
+            Config.PULSAR_SOURCE_TOPIC_NAME.key(),
+            Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key()));
+
+    // Converting to a descriptor allows us to canonicalize the topic's name 
properly
+    this.topicName = 
TopicName.get(props.getString(Config.PULSAR_SOURCE_TOPIC_NAME.key())).toString();
+
+    // TODO validate endpoints provided in the appropriate format
+    this.serviceEndpointURL = 
props.getString(Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key());
+    this.adminEndpointURL = 
props.getString(Config.PULSAR_SOURCE_ADMIN_ENDPOINT_URL.key());
+
+    this.pulsarClient = Lazy.lazily(this::initPulsarClient);
+    this.pulsarConsumer = Lazy.lazily(this::subscribeToTopic);
+  }
+
+  @Override
+  protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCheckpointStr, long sourceLimit) {
+    Pair<MessageId, MessageId> startingEndingOffsetsPair = 
computeOffsets(lastCheckpointStr, sourceLimit);
+
+    MessageId startingOffset = startingEndingOffsetsPair.getLeft();
+    MessageId endingOffset = startingEndingOffsetsPair.getRight();
+
+    String startingOffsetStr = convertToOffsetString(topicName, 
startingOffset);
+    String endingOffsetStr = convertToOffsetString(topicName, endingOffset);
+
+    Dataset<Row> sourceRows = sparkSession.read()
+        .format("pulsar")
+        .option("service.url", serviceEndpointURL)
+        .option("admin.url", adminEndpointURL)
+        .option("topics", topicName)
+        .option("startingOffsets", startingOffsetStr)
+        .option("endingOffsets", endingOffsetStr)
+        .load();
+
+    return Pair.of(Option.of(transform(sourceRows)), endingOffsetStr);
+  }
+
+  @Override
+  public void onCommit(String lastCheckpointStr) {
+    MessageId latestConsumedOffset = 
JsonUtils.topicOffsets(lastCheckpointStr).apply(topicName);
+    ackOffset(latestConsumedOffset);
+  }
+
+  private Dataset<Row> transform(Dataset<Row> rows) {
+    return rows.drop(PULSAR_META_FIELDS);
+  }
+
+  private Pair<MessageId, MessageId> computeOffsets(Option<String> 
lastCheckpointStrOpt, long sourceLimit) {
+    MessageId startingOffset = decodeStartingOffset(lastCheckpointStrOpt);
+    MessageId endingOffset = fetchLatestOffset();
+
+    if (endingOffset.compareTo(startingOffset) < 0) {
+      String message = String.format("Ending offset (%s) is preceding starting 
offset (%s) for '%s'",
+          endingOffset, startingOffset, topicName);
+      throw new HoodieException(message);
+    }
+
+    // TODO support capping the amount of records fetched
+    Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props);
+
+    return Pair.of(startingOffset, endingOffset);
+  }
+
+  private MessageId decodeStartingOffset(Option<String> lastCheckpointStrOpt) {
+    return lastCheckpointStrOpt
+        .map(lastCheckpoint -> 
JsonUtils.topicOffsets(lastCheckpoint).apply(topicName))
+        .orElseGet(() -> {
+          Config.OffsetAutoResetStrategy autoResetStrategy = 
Config.OffsetAutoResetStrategy.valueOf(
+              
props.getString(Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.key(),
+                  
Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.defaultValue().name()));
+
+          switch (autoResetStrategy) {
+            case LATEST:
+              return fetchLatestOffset();
+            case EARLIEST:
+              return MessageId.earliest;
+            case FAIL:
+              throw new IllegalArgumentException("No checkpoint has been 
provided!");
+            default:
+              throw new UnsupportedOperationException("Unsupported offset 
auto-reset strategy");
+          }
+        });
+  }
+
+  private void ackOffset(MessageId latestConsumedOffset) {
+    try {
+      pulsarConsumer.get().acknowledgeCumulative(latestConsumedOffset);
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to ack messageId (%s) for topic '%s'", 
latestConsumedOffset, topicName), e);
+      throw new HoodieIOException("Failed to ack message for topic", e);
+    }
+  }
+
+  private MessageId fetchLatestOffset() {
+    try {
+      return pulsarConsumer.get().getLastMessageId();
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to fetch latest messageId for topic 
'%s'", topicName), e);
+      throw new HoodieIOException("Failed to fetch latest messageId for 
topic", e);
+    }
+  }
+
+  private Consumer<byte[]> subscribeToTopic() {
+    try {
+      // NOTE: We're generating unique subscription-id to make sure that 
subsequent invocation
+      //       of the DS, do not interfere w/ each other
+      String subscriptionId = String.format(HUDI_PULSAR_CONSUMER_ID_FORMAT, 
System.currentTimeMillis());
+      return pulsarClient.get()
+          .newConsumer()
+          .topic(topicName)
+          .subscriptionName(subscriptionId)
+          .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+          .subscriptionType(SubscriptionType.Exclusive)
+          .subscribe();
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to subscribe to Pulsar topic '%s'", 
topicName), e);
+      throw new HoodieIOException("Failed to subscribe to Pulsar topic", e);
+    }
+  }
+
+  private PulsarClient initPulsarClient() {
+    try {
+      return PulsarClient.builder()
+          .serviceUrl(serviceEndpointURL)
+          .build();
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to init Pulsar client connecting to 
'%s'", serviceEndpointURL), e);
+      throw new HoodieIOException("Failed to init Pulsar client", e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    shutdownPulsarClient(pulsarClient.get());
+  }
+
+  private static Long computeTargetRecordLimit(long sourceLimit, 
TypedProperties props) {
+    if (sourceLimit < Long.MAX_VALUE) {
+      return sourceLimit;
+    } else {
+      return 
props.getLong(Config.PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP.key(),
+          
Config.PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP.defaultValue());
+    }
+  }
+
+  private static String convertToOffsetString(String topic, MessageId 
startingOffset) {
+    return JsonUtils.topicOffsets(
+        HoodieConversionUtils.mapAsScalaImmutableMap(
+            Collections.singletonMap(topic, startingOffset)));
+  }
+
+  private static void shutdownPulsarClient(PulsarClient client) throws 
PulsarClientException {
+    client.close();
+    // NOTE: Current version of Pulsar's client (in Pulsar Spark Connector 
3.1.1.4) is not
+    //       shutting down event-loop group properly, so we had to shut it 
down manually
+    try {
+      EventLoopGroup eventLoopGroup = ((PulsarClientImpl) 
client).eventLoopGroup();

Review Comment:
   Correct
   



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.HoodieConversionUtils;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.util.Lazy;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.pulsar.JsonUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.hudi.common.util.ThreadUtils.collectActiveThreads;
+
+/**
+ * Source fetching data from Pulsar topics
+ */
+public class PulsarSource extends RowSource implements Closeable {
+
+  private static final Logger LOG = LogManager.getLogger(PulsarSource.class);
+
+  private static final String HUDI_PULSAR_CONSUMER_ID_FORMAT = 
"hudi-pulsar-consumer-%d";
+  private static final String[] PULSAR_META_FIELDS = new String[]{
+      "__key",
+      "__topic",
+      "__messageId",
+      "__publishTime",
+      "__eventTime",
+      "__messageProperties"
+  };
+
+  private final String topicName;
+
+  private final String serviceEndpointURL;
+  private final String adminEndpointURL;
+
+  // NOTE: We're keeping the client so that we can shut it down properly
+  private final Lazy<PulsarClient> pulsarClient;
+  private final Lazy<Consumer<byte[]>> pulsarConsumer;
+
+  public PulsarSource(TypedProperties props,
+                      JavaSparkContext sparkContext,
+                      SparkSession sparkSession,
+                      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+
+    DataSourceUtils.checkRequiredProperties(props,
+        Arrays.asList(
+            Config.PULSAR_SOURCE_TOPIC_NAME.key(),
+            Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key()));
+
+    // Converting to a descriptor allows us to canonicalize the topic's name 
properly
+    this.topicName = 
TopicName.get(props.getString(Config.PULSAR_SOURCE_TOPIC_NAME.key())).toString();
+
+    // TODO validate endpoints provided in the appropriate format
+    this.serviceEndpointURL = 
props.getString(Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key());
+    this.adminEndpointURL = 
props.getString(Config.PULSAR_SOURCE_ADMIN_ENDPOINT_URL.key());
+
+    this.pulsarClient = Lazy.lazily(this::initPulsarClient);
+    this.pulsarConsumer = Lazy.lazily(this::subscribeToTopic);
+  }
+
+  @Override
+  protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCheckpointStr, long sourceLimit) {
+    Pair<MessageId, MessageId> startingEndingOffsetsPair = 
computeOffsets(lastCheckpointStr, sourceLimit);
+
+    MessageId startingOffset = startingEndingOffsetsPair.getLeft();
+    MessageId endingOffset = startingEndingOffsetsPair.getRight();
+
+    String startingOffsetStr = convertToOffsetString(topicName, 
startingOffset);
+    String endingOffsetStr = convertToOffsetString(topicName, endingOffset);
+
+    Dataset<Row> sourceRows = sparkSession.read()
+        .format("pulsar")
+        .option("service.url", serviceEndpointURL)
+        .option("admin.url", adminEndpointURL)
+        .option("topics", topicName)
+        .option("startingOffsets", startingOffsetStr)
+        .option("endingOffsets", endingOffsetStr)
+        .load();
+
+    return Pair.of(Option.of(transform(sourceRows)), endingOffsetStr);
+  }
+
+  @Override
+  public void onCommit(String lastCheckpointStr) {
+    MessageId latestConsumedOffset = 
JsonUtils.topicOffsets(lastCheckpointStr).apply(topicName);
+    ackOffset(latestConsumedOffset);
+  }
+
+  private Dataset<Row> transform(Dataset<Row> rows) {
+    return rows.drop(PULSAR_META_FIELDS);
+  }
+
+  private Pair<MessageId, MessageId> computeOffsets(Option<String> 
lastCheckpointStrOpt, long sourceLimit) {
+    MessageId startingOffset = decodeStartingOffset(lastCheckpointStrOpt);
+    MessageId endingOffset = fetchLatestOffset();
+
+    if (endingOffset.compareTo(startingOffset) < 0) {
+      String message = String.format("Ending offset (%s) is preceding starting 
offset (%s) for '%s'",
+          endingOffset, startingOffset, topicName);
+      throw new HoodieException(message);
+    }
+
+    // TODO support capping the amount of records fetched
+    Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props);
+
+    return Pair.of(startingOffset, endingOffset);
+  }
+
+  private MessageId decodeStartingOffset(Option<String> lastCheckpointStrOpt) {
+    return lastCheckpointStrOpt
+        .map(lastCheckpoint -> 
JsonUtils.topicOffsets(lastCheckpoint).apply(topicName))
+        .orElseGet(() -> {
+          Config.OffsetAutoResetStrategy autoResetStrategy = 
Config.OffsetAutoResetStrategy.valueOf(
+              
props.getString(Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.key(),
+                  
Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.defaultValue().name()));
+
+          switch (autoResetStrategy) {
+            case LATEST:
+              return fetchLatestOffset();
+            case EARLIEST:
+              return MessageId.earliest;
+            case FAIL:
+              throw new IllegalArgumentException("No checkpoint has been 
provided!");
+            default:
+              throw new UnsupportedOperationException("Unsupported offset 
auto-reset strategy");
+          }
+        });
+  }
+
+  private void ackOffset(MessageId latestConsumedOffset) {
+    try {
+      pulsarConsumer.get().acknowledgeCumulative(latestConsumedOffset);
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to ack messageId (%s) for topic '%s'", 
latestConsumedOffset, topicName), e);
+      throw new HoodieIOException("Failed to ack message for topic", e);
+    }
+  }
+
+  private MessageId fetchLatestOffset() {
+    try {
+      return pulsarConsumer.get().getLastMessageId();
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to fetch latest messageId for topic 
'%s'", topicName), e);
+      throw new HoodieIOException("Failed to fetch latest messageId for 
topic", e);
+    }
+  }
+
+  private Consumer<byte[]> subscribeToTopic() {
+    try {
+      // NOTE: We're generating unique subscription-id to make sure that 
subsequent invocation
+      //       of the DS, do not interfere w/ each other
+      String subscriptionId = String.format(HUDI_PULSAR_CONSUMER_ID_FORMAT, 
System.currentTimeMillis());
+      return pulsarClient.get()
+          .newConsumer()
+          .topic(topicName)
+          .subscriptionName(subscriptionId)
+          .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+          .subscriptionType(SubscriptionType.Exclusive)
+          .subscribe();
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to subscribe to Pulsar topic '%s'", 
topicName), e);
+      throw new HoodieIOException("Failed to subscribe to Pulsar topic", e);
+    }
+  }
+
+  private PulsarClient initPulsarClient() {
+    try {
+      return PulsarClient.builder()
+          .serviceUrl(serviceEndpointURL)
+          .build();
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to init Pulsar client connecting to 
'%s'", serviceEndpointURL), e);
+      throw new HoodieIOException("Failed to init Pulsar client", e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    shutdownPulsarClient(pulsarClient.get());
+  }
+
+  private static Long computeTargetRecordLimit(long sourceLimit, 
TypedProperties props) {
+    if (sourceLimit < Long.MAX_VALUE) {
+      return sourceLimit;
+    } else {
+      return 
props.getLong(Config.PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP.key(),
+          
Config.PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP.defaultValue());
+    }
+  }
+
+  private static String convertToOffsetString(String topic, MessageId 
startingOffset) {
+    return JsonUtils.topicOffsets(
+        HoodieConversionUtils.mapAsScalaImmutableMap(
+            Collections.singletonMap(topic, startingOffset)));
+  }
+
+  private static void shutdownPulsarClient(PulsarClient client) throws 
PulsarClientException {
+    client.close();
+    // NOTE: Current version of Pulsar's client (in Pulsar Spark Connector 
3.1.1.4) is not
+    //       shutting down event-loop group properly, so we had to shut it 
down manually
+    try {
+      EventLoopGroup eventLoopGroup = ((PulsarClientImpl) 
client).eventLoopGroup();
+      if (eventLoopGroup != null) {
+        eventLoopGroup.shutdownGracefully().await();

Review Comment:
   Good catch!



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.HoodieConversionUtils;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.util.Lazy;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.pulsar.JsonUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.hudi.common.util.ThreadUtils.collectActiveThreads;
+
+/**
+ * Source fetching data from Pulsar topics
+ */
+public class PulsarSource extends RowSource implements Closeable {
+
+  private static final Logger LOG = LogManager.getLogger(PulsarSource.class);
+
+  private static final String HUDI_PULSAR_CONSUMER_ID_FORMAT = 
"hudi-pulsar-consumer-%d";
+  private static final String[] PULSAR_META_FIELDS = new String[]{
+      "__key",
+      "__topic",
+      "__messageId",
+      "__publishTime",
+      "__eventTime",
+      "__messageProperties"
+  };
+
+  private final String topicName;
+
+  private final String serviceEndpointURL;
+  private final String adminEndpointURL;
+
+  // NOTE: We're keeping the client so that we can shut it down properly
+  private final Lazy<PulsarClient> pulsarClient;
+  private final Lazy<Consumer<byte[]>> pulsarConsumer;
+
+  public PulsarSource(TypedProperties props,
+                      JavaSparkContext sparkContext,
+                      SparkSession sparkSession,
+                      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+
+    DataSourceUtils.checkRequiredProperties(props,
+        Arrays.asList(
+            Config.PULSAR_SOURCE_TOPIC_NAME.key(),
+            Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key()));
+
+    // Converting to a descriptor allows us to canonicalize the topic's name 
properly
+    this.topicName = 
TopicName.get(props.getString(Config.PULSAR_SOURCE_TOPIC_NAME.key())).toString();
+
+    // TODO validate endpoints provided in the appropriate format
+    this.serviceEndpointURL = 
props.getString(Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key());
+    this.adminEndpointURL = 
props.getString(Config.PULSAR_SOURCE_ADMIN_ENDPOINT_URL.key());
+
+    this.pulsarClient = Lazy.lazily(this::initPulsarClient);
+    this.pulsarConsumer = Lazy.lazily(this::subscribeToTopic);
+  }
+
+  @Override
+  protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCheckpointStr, long sourceLimit) {
+    Pair<MessageId, MessageId> startingEndingOffsetsPair = 
computeOffsets(lastCheckpointStr, sourceLimit);
+
+    MessageId startingOffset = startingEndingOffsetsPair.getLeft();
+    MessageId endingOffset = startingEndingOffsetsPair.getRight();
+
+    String startingOffsetStr = convertToOffsetString(topicName, 
startingOffset);
+    String endingOffsetStr = convertToOffsetString(topicName, endingOffset);
+
+    Dataset<Row> sourceRows = sparkSession.read()
+        .format("pulsar")
+        .option("service.url", serviceEndpointURL)
+        .option("admin.url", adminEndpointURL)
+        .option("topics", topicName)
+        .option("startingOffsets", startingOffsetStr)
+        .option("endingOffsets", endingOffsetStr)
+        .load();
+
+    return Pair.of(Option.of(transform(sourceRows)), endingOffsetStr);

Review Comment:
   Good call. First of all, we currently pass as an ending offset the one that 
is the latest available to read, so technically if Pulsar reports it available 
we should be able to read it. There could be intermittent failures, but they 
should end up in _failures_, which has to prevent the pipeline from committing 
the batch.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.HoodieConversionUtils;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.util.Lazy;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.pulsar.JsonUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.hudi.common.util.ThreadUtils.collectActiveThreads;
+
+/**
+ * Source fetching data from Pulsar topics
+ */
+public class PulsarSource extends RowSource implements Closeable {
+
+  private static final Logger LOG = LogManager.getLogger(PulsarSource.class);
+
+  private static final String HUDI_PULSAR_CONSUMER_ID_FORMAT = 
"hudi-pulsar-consumer-%d";
+  private static final String[] PULSAR_META_FIELDS = new String[]{
+      "__key",
+      "__topic",
+      "__messageId",
+      "__publishTime",
+      "__eventTime",
+      "__messageProperties"
+  };
+
+  private final String topicName;
+
+  private final String serviceEndpointURL;
+  private final String adminEndpointURL;
+
+  // NOTE: We're keeping the client so that we can shut it down properly
+  private final Lazy<PulsarClient> pulsarClient;
+  private final Lazy<Consumer<byte[]>> pulsarConsumer;
+
+  public PulsarSource(TypedProperties props,
+                      JavaSparkContext sparkContext,
+                      SparkSession sparkSession,
+                      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+
+    DataSourceUtils.checkRequiredProperties(props,
+        Arrays.asList(
+            Config.PULSAR_SOURCE_TOPIC_NAME.key(),
+            Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key()));
+
+    // Converting to a descriptor allows us to canonicalize the topic's name 
properly
+    this.topicName = 
TopicName.get(props.getString(Config.PULSAR_SOURCE_TOPIC_NAME.key())).toString();
+
+    // TODO validate endpoints provided in the appropriate format
+    this.serviceEndpointURL = 
props.getString(Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key());
+    this.adminEndpointURL = 
props.getString(Config.PULSAR_SOURCE_ADMIN_ENDPOINT_URL.key());
+
+    this.pulsarClient = Lazy.lazily(this::initPulsarClient);
+    this.pulsarConsumer = Lazy.lazily(this::subscribeToTopic);
+  }
+
+  @Override
+  protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCheckpointStr, long sourceLimit) {
+    Pair<MessageId, MessageId> startingEndingOffsetsPair = 
computeOffsets(lastCheckpointStr, sourceLimit);
+
+    MessageId startingOffset = startingEndingOffsetsPair.getLeft();
+    MessageId endingOffset = startingEndingOffsetsPair.getRight();
+
+    String startingOffsetStr = convertToOffsetString(topicName, 
startingOffset);
+    String endingOffsetStr = convertToOffsetString(topicName, endingOffset);
+
+    Dataset<Row> sourceRows = sparkSession.read()
+        .format("pulsar")
+        .option("service.url", serviceEndpointURL)
+        .option("admin.url", adminEndpointURL)
+        .option("topics", topicName)
+        .option("startingOffsets", startingOffsetStr)
+        .option("endingOffsets", endingOffsetStr)
+        .load();
+
+    return Pair.of(Option.of(transform(sourceRows)), endingOffsetStr);
+  }
+
+  @Override
+  public void onCommit(String lastCheckpointStr) {
+    MessageId latestConsumedOffset = 
JsonUtils.topicOffsets(lastCheckpointStr).apply(topicName);
+    ackOffset(latestConsumedOffset);
+  }
+
+  private Dataset<Row> transform(Dataset<Row> rows) {
+    return rows.drop(PULSAR_META_FIELDS);
+  }
+
+  private Pair<MessageId, MessageId> computeOffsets(Option<String> 
lastCheckpointStrOpt, long sourceLimit) {
+    MessageId startingOffset = decodeStartingOffset(lastCheckpointStrOpt);
+    MessageId endingOffset = fetchLatestOffset();
+
+    if (endingOffset.compareTo(startingOffset) < 0) {
+      String message = String.format("Ending offset (%s) is preceding starting 
offset (%s) for '%s'",
+          endingOffset, startingOffset, topicName);
+      throw new HoodieException(message);
+    }
+
+    // TODO support capping the amount of records fetched
+    Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props);
+
+    return Pair.of(startingOffset, endingOffset);
+  }
+
+  private MessageId decodeStartingOffset(Option<String> lastCheckpointStrOpt) {
+    return lastCheckpointStrOpt
+        .map(lastCheckpoint -> 
JsonUtils.topicOffsets(lastCheckpoint).apply(topicName))
+        .orElseGet(() -> {
+          Config.OffsetAutoResetStrategy autoResetStrategy = 
Config.OffsetAutoResetStrategy.valueOf(
+              
props.getString(Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.key(),
+                  
Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.defaultValue().name()));
+
+          switch (autoResetStrategy) {
+            case LATEST:
+              return fetchLatestOffset();
+            case EARLIEST:
+              return MessageId.earliest;
+            case FAIL:
+              throw new IllegalArgumentException("No checkpoint has been 
provided!");
+            default:
+              throw new UnsupportedOperationException("Unsupported offset 
auto-reset strategy");
+          }
+        });
+  }
+
+  private void ackOffset(MessageId latestConsumedOffset) {
+    try {
+      pulsarConsumer.get().acknowledgeCumulative(latestConsumedOffset);
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to ack messageId (%s) for topic '%s'", 
latestConsumedOffset, topicName), e);
+      throw new HoodieIOException("Failed to ack message for topic", e);
+    }
+  }
+
+  private MessageId fetchLatestOffset() {
+    try {
+      return pulsarConsumer.get().getLastMessageId();
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to fetch latest messageId for topic 
'%s'", topicName), e);
+      throw new HoodieIOException("Failed to fetch latest messageId for 
topic", e);
+    }
+  }
+
+  private Consumer<byte[]> subscribeToTopic() {

Review Comment:
   Yeah, thought about wrapping all interactions into some sort of internal 
client. Decided to punt for now until we get more clear case for it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to