This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a41386fda2 [HUDI-4616] Adding `PulsarSource` to `DeltaStreamer` to 
support ingesting from Apache Pulsar (#6386)
a41386fda2 is described below

commit a41386fda238c7c52fd4abd6a72fc107eb992ced
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Thu Aug 18 12:43:44 2022 -0700

    [HUDI-4616] Adding `PulsarSource` to `DeltaStreamer` to support ingesting 
from Apache Pulsar (#6386)
    
    - Adding PulsarSource to DeltaStreamer to support ingesting from Apache 
Pulsar.
    - Current implementation of PulsarSource is relying on 
"pulsar-spark-connector" to ingest using Spark instead of building similar 
pipeline from scratch.
---
 .../org/apache/hudi/HoodieConversionUtils.scala    |  13 +
 .../org/apache/hudi/common/util/ThreadUtils.java   |  26 +-
 hudi-utilities/pom.xml                             |   8 +
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   9 +-
 .../deltastreamer/SourceFormatAdapter.java         |  17 +-
 .../hudi/utilities/sources/PulsarSource.java       | 297 +++++++++++++++++++++
 pom.xml                                            |   7 +
 7 files changed, 363 insertions(+), 14 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
index 547c6aed62..82c65705fb 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
@@ -20,8 +20,21 @@ package org.apache.hudi
 
 import org.apache.hudi.common.config.TypedProperties
 
+import java.{util => ju}
+import scala.collection.JavaConverters
+
 object HoodieConversionUtils {
 
+  /**
+   * Converts Java's [[ju.Map]] into Scala's (immutable) [[Map]] (by default 
[[JavaConverters]] convert to
+   * a mutable one)
+   */
+  def mapAsScalaImmutableMap[K, V](map: ju.Map[K, V]): Map[K, V] = {
+    // NOTE: We have to use deprecated [[JavaConversions]] to stay compatible 
w/ Scala 2.11
+    import scala.collection.JavaConversions.mapAsScalaMap
+    map.toMap
+  }
+
   def toJavaOption[T](opt: Option[T]): org.apache.hudi.common.util.Option[T] =
     if (opt.isDefined) org.apache.hudi.common.util.Option.of(opt.get) else 
org.apache.hudi.common.util.Option.empty()
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
 b/hudi-common/src/main/java/org/apache/hudi/common/util/ThreadUtils.java
similarity index 57%
copy from 
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
copy to hudi-common/src/main/java/org/apache/hudi/common/util/ThreadUtils.java
index 547c6aed62..aef791aa87 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ThreadUtils.java
@@ -16,22 +16,26 @@
  * limitations under the License.
  */
 
-package org.apache.hudi
+package org.apache.hudi.common.util;
 
-import org.apache.hudi.common.config.TypedProperties
+import java.util.Arrays;
+import java.util.List;
 
-object HoodieConversionUtils {
+public class ThreadUtils {
 
-  def toJavaOption[T](opt: Option[T]): org.apache.hudi.common.util.Option[T] =
-    if (opt.isDefined) org.apache.hudi.common.util.Option.of(opt.get) else 
org.apache.hudi.common.util.Option.empty()
+  /**
+   * Fetches all active threads currently running in the JVM
+   */
+  public static List<Thread> collectActiveThreads() {
+    ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
+    while (threadGroup.getParent() != null) {
+      threadGroup = threadGroup.getParent();
+    }
 
-  def toScalaOption[T](opt: org.apache.hudi.common.util.Option[T]): Option[T] =
-    if (opt.isPresent) Some(opt.get) else None
+    Thread[] activeThreads = new Thread[threadGroup.activeCount()];
+    threadGroup.enumerate(activeThreads);
 
-  def toProperties(params: Map[String, String]): TypedProperties = {
-    val props = new TypedProperties()
-    params.foreach(kv => props.setProperty(kv._1, kv._2))
-    props
+    return Arrays.asList(activeThreads);
   }
 
 }
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 322297dfda..da5a42749b 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -171,6 +171,14 @@
       </exclusions>
     </dependency>
 
+    <!-- Pulsar Spark Connector -->
+    <dependency>
+      <groupId>io.streamnative.connectors</groupId>
+      <artifactId>pulsar-spark-connector_${scala.binary.version}</artifactId>
+      <version>${pulsar.spark.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
     <!-- Logging -->
     <dependency>
       <groupId>org.apache.logging.log4j</groupId>
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 3c1d1f5ef1..c108a1d7a1 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -91,6 +91,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -124,7 +125,7 @@ import static 
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_REC
 /**
  * Sync's one batch of data to hoodie table.
  */
-public class DeltaSync implements Serializable {
+public class DeltaSync implements Serializable, Closeable {
 
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LogManager.getLogger(DeltaSync.class);
@@ -895,11 +896,15 @@ public class DeltaSync implements Serializable {
    * Close all resources.
    */
   public void close() {
-    if (null != writeClient) {
+    if (writeClient != null) {
       writeClient.close();
       writeClient = null;
     }
 
+    if (formatAdapter != null) {
+      formatAdapter.close();
+    }
+
     LOG.info("Shutting down embedded timeline server");
     if (embeddedTimelineService.isPresent()) {
       embeddedTimelineService.get().stop();
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
index 1260acb1ce..3514ace829 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.deltastreamer;
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -38,13 +39,16 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.types.StructType;
 
+import java.io.Closeable;
+import java.io.IOException;
+
 import static 
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
 import static 
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
 
 /**
  * Adapts data-format provided by the source to the data-format required by 
the client (DeltaStreamer).
  */
-public final class SourceFormatAdapter {
+public final class SourceFormatAdapter implements Closeable {
 
   private final Source source;
 
@@ -123,4 +127,15 @@ public final class SourceFormatAdapter {
   public Source getSource() {
     return source;
   }
+
+  @Override
+  public void close() {
+    if (source instanceof Closeable) {
+      try {
+        ((Closeable) source).close();
+      } catch (IOException e) {
+        throw new HoodieIOException(String.format("Failed to shutdown the 
source (%s)", source.getClass().getName()), e);
+      }
+    }
+  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
new file mode 100644
index 0000000000..dbfd28f806
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.HoodieConversionUtils;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.util.Lazy;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.pulsar.JsonUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hudi.common.util.ThreadUtils.collectActiveThreads;
+
+/**
+ * Source fetching data from Pulsar topics
+ */
+public class PulsarSource extends RowSource implements Closeable {
+
+  private static final Logger LOG = LogManager.getLogger(PulsarSource.class);
+
+  private static final Duration GRACEFUL_SHUTDOWN_TIMEOUT = 
Duration.ofSeconds(20);
+
+  private static final String HUDI_PULSAR_CONSUMER_ID_FORMAT = 
"hudi-pulsar-consumer-%d";
+  private static final String[] PULSAR_META_FIELDS = new String[]{
+      "__key",
+      "__topic",
+      "__messageId",
+      "__publishTime",
+      "__eventTime",
+      "__messageProperties"
+  };
+
+  private final String topicName;
+
+  private final String serviceEndpointURL;
+  private final String adminEndpointURL;
+
+  // NOTE: We're keeping the client so that we can shut it down properly
+  private final Lazy<PulsarClient> pulsarClient;
+  private final Lazy<Consumer<byte[]>> pulsarConsumer;
+
+  public PulsarSource(TypedProperties props,
+                      JavaSparkContext sparkContext,
+                      SparkSession sparkSession,
+                      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+
+    DataSourceUtils.checkRequiredProperties(props,
+        Arrays.asList(
+            Config.PULSAR_SOURCE_TOPIC_NAME.key(),
+            Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key()));
+
+    // Converting to a descriptor allows us to canonicalize the topic's name 
properly
+    this.topicName = 
TopicName.get(props.getString(Config.PULSAR_SOURCE_TOPIC_NAME.key())).toString();
+
+    // TODO validate endpoints provided in the appropriate format
+    this.serviceEndpointURL = 
props.getString(Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key());
+    this.adminEndpointURL = 
props.getString(Config.PULSAR_SOURCE_ADMIN_ENDPOINT_URL.key());
+
+    this.pulsarClient = Lazy.lazily(this::initPulsarClient);
+    this.pulsarConsumer = Lazy.lazily(this::subscribeToTopic);
+  }
+
+  @Override
+  protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCheckpointStr, long sourceLimit) {
+    Pair<MessageId, MessageId> startingEndingOffsetsPair = 
computeOffsets(lastCheckpointStr, sourceLimit);
+
+    MessageId startingOffset = startingEndingOffsetsPair.getLeft();
+    MessageId endingOffset = startingEndingOffsetsPair.getRight();
+
+    String startingOffsetStr = convertToOffsetString(topicName, 
startingOffset);
+    String endingOffsetStr = convertToOffsetString(topicName, endingOffset);
+
+    Dataset<Row> sourceRows = sparkSession.read()
+        .format("pulsar")
+        .option("service.url", serviceEndpointURL)
+        .option("admin.url", adminEndpointURL)
+        .option("topics", topicName)
+        .option("startingOffsets", startingOffsetStr)
+        .option("endingOffsets", endingOffsetStr)
+        .load();
+
+    return Pair.of(Option.of(transform(sourceRows)), endingOffsetStr);
+  }
+
+  @Override
+  public void onCommit(String lastCheckpointStr) {
+    MessageId latestConsumedOffset = 
JsonUtils.topicOffsets(lastCheckpointStr).apply(topicName);
+    ackOffset(latestConsumedOffset);
+  }
+
+  private Dataset<Row> transform(Dataset<Row> rows) {
+    return rows.drop(PULSAR_META_FIELDS);
+  }
+
+  private Pair<MessageId, MessageId> computeOffsets(Option<String> 
lastCheckpointStrOpt, long sourceLimit) {
+    MessageId startingOffset = decodeStartingOffset(lastCheckpointStrOpt);
+    MessageId endingOffset = fetchLatestOffset();
+
+    if (endingOffset.compareTo(startingOffset) < 0) {
+      String message = String.format("Ending offset (%s) is preceding starting 
offset (%s) for '%s'",
+          endingOffset, startingOffset, topicName);
+      throw new HoodieException(message);
+    }
+
+    // TODO support capping the amount of records fetched
+    Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props);
+
+    return Pair.of(startingOffset, endingOffset);
+  }
+
+  private MessageId decodeStartingOffset(Option<String> lastCheckpointStrOpt) {
+    return lastCheckpointStrOpt
+        .map(lastCheckpoint -> 
JsonUtils.topicOffsets(lastCheckpoint).apply(topicName))
+        .orElseGet(() -> {
+          Config.OffsetAutoResetStrategy autoResetStrategy = 
Config.OffsetAutoResetStrategy.valueOf(
+              
props.getString(Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.key(),
+                  
Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.defaultValue().name()));
+
+          switch (autoResetStrategy) {
+            case LATEST:
+              return fetchLatestOffset();
+            case EARLIEST:
+              return MessageId.earliest;
+            case FAIL:
+              throw new IllegalArgumentException("No checkpoint has been 
provided!");
+            default:
+              throw new UnsupportedOperationException("Unsupported offset 
auto-reset strategy");
+          }
+        });
+  }
+
+  private void ackOffset(MessageId latestConsumedOffset) {
+    try {
+      pulsarConsumer.get().acknowledgeCumulative(latestConsumedOffset);
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to ack messageId (%s) for topic '%s'", 
latestConsumedOffset, topicName), e);
+      throw new HoodieIOException("Failed to ack message for topic", e);
+    }
+  }
+
+  private MessageId fetchLatestOffset() {
+    try {
+      return pulsarConsumer.get().getLastMessageId();
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to fetch latest messageId for topic 
'%s'", topicName), e);
+      throw new HoodieIOException("Failed to fetch latest messageId for 
topic", e);
+    }
+  }
+
+  private Consumer<byte[]> subscribeToTopic() {
+    try {
+      // NOTE: We're generating unique subscription-id to make sure that 
subsequent invocation
+      //       of the DS, do not interfere w/ each other
+      String subscriptionId = String.format(HUDI_PULSAR_CONSUMER_ID_FORMAT, 
System.currentTimeMillis());
+      return pulsarClient.get()
+          .newConsumer()
+          .topic(topicName)
+          .subscriptionName(subscriptionId)
+          .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+          .subscriptionType(SubscriptionType.Exclusive)
+          .subscribe();
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to subscribe to Pulsar topic '%s'", 
topicName), e);
+      throw new HoodieIOException("Failed to subscribe to Pulsar topic", e);
+    }
+  }
+
+  private PulsarClient initPulsarClient() {
+    try {
+      return PulsarClient.builder()
+          .serviceUrl(serviceEndpointURL)
+          .build();
+    } catch (PulsarClientException e) {
+      LOG.error(String.format("Failed to init Pulsar client connecting to 
'%s'", serviceEndpointURL), e);
+      throw new HoodieIOException("Failed to init Pulsar client", e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    shutdownPulsarClient(pulsarClient.get());
+  }
+
+  private static Long computeTargetRecordLimit(long sourceLimit, 
TypedProperties props) {
+    if (sourceLimit < Long.MAX_VALUE) {
+      return sourceLimit;
+    } else {
+      return 
props.getLong(Config.PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP.key(),
+          
Config.PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP.defaultValue());
+    }
+  }
+
+  private static String convertToOffsetString(String topic, MessageId 
startingOffset) {
+    return JsonUtils.topicOffsets(
+        HoodieConversionUtils.mapAsScalaImmutableMap(
+            Collections.singletonMap(topic, startingOffset)));
+  }
+
+  private static void shutdownPulsarClient(PulsarClient client) throws 
PulsarClientException {
+    client.close();
+    // NOTE: Current version of Pulsar's client (in Pulsar Spark Connector 
3.1.1.4) is not
+    //       shutting down event-loop group properly, so we had to shut it 
down manually
+    try {
+      EventLoopGroup eventLoopGroup = ((PulsarClientImpl) 
client).eventLoopGroup();
+      if (eventLoopGroup != null) {
+        eventLoopGroup.shutdownGracefully()
+            .await(GRACEFUL_SHUTDOWN_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
+      }
+    } catch (InterruptedException e) {
+      // No-op
+    }
+
+    // NOTE: Pulsar clients initialized by the spark-connector, might be left 
not shutdown
+    //       properly (see above). To work this around we employ "nuclear" 
option of
+    //       fetching all Pulsar client threads and interrupting them forcibly 
(to make them
+    //       shutdown)
+    collectActiveThreads().stream().sequential()
+        .filter(t -> t.getName().startsWith("pulsar-client-io"))
+        .forEach(Thread::interrupt);
+  }
+
+  public static class Config {
+    private static final ConfigProperty<String> PULSAR_SOURCE_TOPIC_NAME = 
ConfigProperty
+        .key("hoodie.deltastreamer.source.pulsar.topic")
+        .noDefaultValue()
+        .withDocumentation("Name of the target Pulsar topic to source data 
from");
+
+    private static final ConfigProperty<String> 
PULSAR_SOURCE_SERVICE_ENDPOINT_URL = ConfigProperty
+        .key("hoodie.deltastreamer.source.pulsar.endpoint.service.url")
+        .defaultValue("pulsar://localhost:6650")
+        .withDocumentation("URL of the target Pulsar endpoint (of the form 
'pulsar://host:port'");
+
+    private static final ConfigProperty<String> 
PULSAR_SOURCE_ADMIN_ENDPOINT_URL = ConfigProperty
+        .key("hoodie.deltastreamer.source.pulsar.endpoint.admin.url")
+        .defaultValue("http://localhost:8080";)
+        .withDocumentation("URL of the target Pulsar endpoint (of the form 
'pulsar://host:port'");
+
+    public enum OffsetAutoResetStrategy {
+      LATEST, EARLIEST, FAIL
+    }
+
+    private static final ConfigProperty<OffsetAutoResetStrategy> 
PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY = ConfigProperty
+        .key("hoodie.deltastreamer.source.pulsar.offset.autoResetStrategy")
+        .defaultValue(OffsetAutoResetStrategy.LATEST)
+        .withDocumentation("Policy determining how offsets shall be 
automatically reset in case there's "
+            + "no checkpoint information present");
+
+    public static final ConfigProperty<Long> 
PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP = ConfigProperty
+        .key("hoodie.deltastreamer.source.pulsar.maxRecords")
+        .defaultValue(5_000_000L)
+        .withDocumentation("Max number of records obtained in a single each 
batch");
+  }
+}
diff --git a/pom.xml b/pom.xml
index 66a9335937..1f9550218c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,6 +98,9 @@
     <kafka.version>2.0.0</kafka.version>
     <kafka.spark3.version>2.4.1</kafka.spark3.version>
     <pulsar.version>2.8.1</pulsar.version>
+    
<pulsar.spark.version>${pulsar.spark.scala11.version}</pulsar.spark.version>
+    <pulsar.spark.scala11.version>2.4.5</pulsar.spark.scala11.version>
+    <pulsar.spark.scala12.version>3.1.1.4</pulsar.spark.scala12.version>
     <confluent.version>5.3.4</confluent.version>
     <glassfish.version>2.17</glassfish.version>
     <glassfish.el.version>3.0.1-b12</glassfish.el.version>
@@ -1602,12 +1605,16 @@
     <!-- Exists for backwards compatibility; profile doesn't do anything -->
     <profile>
       <id>scala-2.11</id>
+      <properties>
+        
<pulsar.spark.version>${pulsar.spark.scala11.version}</pulsar.spark.version>
+      </properties>
     </profile>
     <profile>
       <id>scala-2.12</id>
       <properties>
         <scala.version>${scala12.version}</scala.version>
         <scala.binary.version>2.12</scala.binary.version>
+        
<pulsar.spark.version>${pulsar.spark.scala12.version}</pulsar.spark.version>
       </properties>
       <activation>
         <property>

Reply via email to