This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a41386fda2 [HUDI-4616] Adding `PulsarSource` to `DeltaStreamer` to
support ingesting from Apache Pulsar (#6386)
a41386fda2 is described below
commit a41386fda238c7c52fd4abd6a72fc107eb992ced
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Thu Aug 18 12:43:44 2022 -0700
[HUDI-4616] Adding `PulsarSource` to `DeltaStreamer` to support ingesting
from Apache Pulsar (#6386)
- Adding PulsarSource to DeltaStreamer to support ingesting from Apache
Pulsar.
- Current implementation of PulsarSource is relying on
"pulsar-spark-connector" to ingest using Spark instead of building similar
pipeline from scratch.
---
.../org/apache/hudi/HoodieConversionUtils.scala | 13 +
.../org/apache/hudi/common/util/ThreadUtils.java | 26 +-
hudi-utilities/pom.xml | 8 +
.../hudi/utilities/deltastreamer/DeltaSync.java | 9 +-
.../deltastreamer/SourceFormatAdapter.java | 17 +-
.../hudi/utilities/sources/PulsarSource.java | 297 +++++++++++++++++++++
pom.xml | 7 +
7 files changed, 363 insertions(+), 14 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
index 547c6aed62..82c65705fb 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
@@ -20,8 +20,21 @@ package org.apache.hudi
import org.apache.hudi.common.config.TypedProperties
+import java.{util => ju}
+import scala.collection.JavaConverters
+
object HoodieConversionUtils {
+ /**
+ * Converts Java's [[ju.Map]] into Scala's (immutable) [[Map]] (by default
[[JavaConverters]] convert to
+ * a mutable one)
+ */
+ def mapAsScalaImmutableMap[K, V](map: ju.Map[K, V]): Map[K, V] = {
+ // NOTE: We have to use deprecated [[JavaConversions]] to stay compatible
w/ Scala 2.11
+ import scala.collection.JavaConversions.mapAsScalaMap
+ map.toMap
+ }
+
def toJavaOption[T](opt: Option[T]): org.apache.hudi.common.util.Option[T] =
if (opt.isDefined) org.apache.hudi.common.util.Option.of(opt.get) else
org.apache.hudi.common.util.Option.empty()
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
b/hudi-common/src/main/java/org/apache/hudi/common/util/ThreadUtils.java
similarity index 57%
copy from
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
copy to hudi-common/src/main/java/org/apache/hudi/common/util/ThreadUtils.java
index 547c6aed62..aef791aa87 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ThreadUtils.java
@@ -16,22 +16,26 @@
* limitations under the License.
*/
-package org.apache.hudi
+package org.apache.hudi.common.util;
-import org.apache.hudi.common.config.TypedProperties
+import java.util.Arrays;
+import java.util.List;
-object HoodieConversionUtils {
+public class ThreadUtils {
- def toJavaOption[T](opt: Option[T]): org.apache.hudi.common.util.Option[T] =
- if (opt.isDefined) org.apache.hudi.common.util.Option.of(opt.get) else
org.apache.hudi.common.util.Option.empty()
+ /**
+ * Fetches all active threads currently running in the JVM
+ */
+ public static List<Thread> collectActiveThreads() {
+ ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
+ while (threadGroup.getParent() != null) {
+ threadGroup = threadGroup.getParent();
+ }
- def toScalaOption[T](opt: org.apache.hudi.common.util.Option[T]): Option[T] =
- if (opt.isPresent) Some(opt.get) else None
+ Thread[] activeThreads = new Thread[threadGroup.activeCount()];
+ threadGroup.enumerate(activeThreads);
- def toProperties(params: Map[String, String]): TypedProperties = {
- val props = new TypedProperties()
- params.foreach(kv => props.setProperty(kv._1, kv._2))
- props
+ return Arrays.asList(activeThreads);
}
}
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 322297dfda..da5a42749b 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -171,6 +171,14 @@
</exclusions>
</dependency>
+ <!-- Pulsar Spark Connector -->
+ <dependency>
+ <groupId>io.streamnative.connectors</groupId>
+ <artifactId>pulsar-spark-connector_${scala.binary.version}</artifactId>
+ <version>${pulsar.spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<!-- Logging -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 3c1d1f5ef1..c108a1d7a1 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -91,6 +91,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -124,7 +125,7 @@ import static
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_REC
/**
* Sync's one batch of data to hoodie table.
*/
-public class DeltaSync implements Serializable {
+public class DeltaSync implements Serializable, Closeable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(DeltaSync.class);
@@ -895,11 +896,15 @@ public class DeltaSync implements Serializable {
* Close all resources.
*/
public void close() {
- if (null != writeClient) {
+ if (writeClient != null) {
writeClient.close();
writeClient = null;
}
+ if (formatAdapter != null) {
+ formatAdapter.close();
+ }
+
LOG.info("Shutting down embedded timeline server");
if (embeddedTimelineService.isPresent()) {
embeddedTimelineService.get().stop();
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
index 1260acb1ce..3514ace829 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -38,13 +39,16 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
+import java.io.Closeable;
+import java.io.IOException;
+
import static
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
import static
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
/**
* Adapts data-format provided by the source to the data-format required by
the client (DeltaStreamer).
*/
-public final class SourceFormatAdapter {
+public final class SourceFormatAdapter implements Closeable {
private final Source source;
@@ -123,4 +127,15 @@ public final class SourceFormatAdapter {
public Source getSource() {
return source;
}
+
+ @Override
+ public void close() {
+ if (source instanceof Closeable) {
+ try {
+ ((Closeable) source).close();
+ } catch (IOException e) {
+ throw new HoodieIOException(String.format("Failed to shutdown the
source (%s)", source.getClass().getName()), e);
+ }
+ }
+ }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
new file mode 100644
index 0000000000..dbfd28f806
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.HoodieConversionUtils;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.util.Lazy;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.pulsar.JsonUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hudi.common.util.ThreadUtils.collectActiveThreads;
+
+/**
+ * Source fetching data from Pulsar topics
+ */
+public class PulsarSource extends RowSource implements Closeable {
+
+ private static final Logger LOG = LogManager.getLogger(PulsarSource.class);
+
+ private static final Duration GRACEFUL_SHUTDOWN_TIMEOUT =
Duration.ofSeconds(20);
+
+ private static final String HUDI_PULSAR_CONSUMER_ID_FORMAT =
"hudi-pulsar-consumer-%d";
+ private static final String[] PULSAR_META_FIELDS = new String[]{
+ "__key",
+ "__topic",
+ "__messageId",
+ "__publishTime",
+ "__eventTime",
+ "__messageProperties"
+ };
+
+ private final String topicName;
+
+ private final String serviceEndpointURL;
+ private final String adminEndpointURL;
+
+ // NOTE: We're keeping the client so that we can shut it down properly
+ private final Lazy<PulsarClient> pulsarClient;
+ private final Lazy<Consumer<byte[]>> pulsarConsumer;
+
+ public PulsarSource(TypedProperties props,
+ JavaSparkContext sparkContext,
+ SparkSession sparkSession,
+ SchemaProvider schemaProvider) {
+ super(props, sparkContext, sparkSession, schemaProvider);
+
+ DataSourceUtils.checkRequiredProperties(props,
+ Arrays.asList(
+ Config.PULSAR_SOURCE_TOPIC_NAME.key(),
+ Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key()));
+
+ // Converting to a descriptor allows us to canonicalize the topic's name
properly
+ this.topicName =
TopicName.get(props.getString(Config.PULSAR_SOURCE_TOPIC_NAME.key())).toString();
+
+ // TODO validate endpoints provided in the appropriate format
+ this.serviceEndpointURL =
props.getString(Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key());
+ this.adminEndpointURL =
props.getString(Config.PULSAR_SOURCE_ADMIN_ENDPOINT_URL.key());
+
+ this.pulsarClient = Lazy.lazily(this::initPulsarClient);
+ this.pulsarConsumer = Lazy.lazily(this::subscribeToTopic);
+ }
+
+ @Override
+ protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String>
lastCheckpointStr, long sourceLimit) {
+ Pair<MessageId, MessageId> startingEndingOffsetsPair =
computeOffsets(lastCheckpointStr, sourceLimit);
+
+ MessageId startingOffset = startingEndingOffsetsPair.getLeft();
+ MessageId endingOffset = startingEndingOffsetsPair.getRight();
+
+ String startingOffsetStr = convertToOffsetString(topicName,
startingOffset);
+ String endingOffsetStr = convertToOffsetString(topicName, endingOffset);
+
+ Dataset<Row> sourceRows = sparkSession.read()
+ .format("pulsar")
+ .option("service.url", serviceEndpointURL)
+ .option("admin.url", adminEndpointURL)
+ .option("topics", topicName)
+ .option("startingOffsets", startingOffsetStr)
+ .option("endingOffsets", endingOffsetStr)
+ .load();
+
+ return Pair.of(Option.of(transform(sourceRows)), endingOffsetStr);
+ }
+
+ @Override
+ public void onCommit(String lastCheckpointStr) {
+ MessageId latestConsumedOffset =
JsonUtils.topicOffsets(lastCheckpointStr).apply(topicName);
+ ackOffset(latestConsumedOffset);
+ }
+
+ private Dataset<Row> transform(Dataset<Row> rows) {
+ return rows.drop(PULSAR_META_FIELDS);
+ }
+
+ private Pair<MessageId, MessageId> computeOffsets(Option<String>
lastCheckpointStrOpt, long sourceLimit) {
+ MessageId startingOffset = decodeStartingOffset(lastCheckpointStrOpt);
+ MessageId endingOffset = fetchLatestOffset();
+
+ if (endingOffset.compareTo(startingOffset) < 0) {
+ String message = String.format("Ending offset (%s) is preceding starting
offset (%s) for '%s'",
+ endingOffset, startingOffset, topicName);
+ throw new HoodieException(message);
+ }
+
+ // TODO support capping the amount of records fetched
+ Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props);
+
+ return Pair.of(startingOffset, endingOffset);
+ }
+
+ private MessageId decodeStartingOffset(Option<String> lastCheckpointStrOpt) {
+ return lastCheckpointStrOpt
+ .map(lastCheckpoint ->
JsonUtils.topicOffsets(lastCheckpoint).apply(topicName))
+ .orElseGet(() -> {
+ Config.OffsetAutoResetStrategy autoResetStrategy =
Config.OffsetAutoResetStrategy.valueOf(
+
props.getString(Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.key(),
+
Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.defaultValue().name()));
+
+ switch (autoResetStrategy) {
+ case LATEST:
+ return fetchLatestOffset();
+ case EARLIEST:
+ return MessageId.earliest;
+ case FAIL:
+ throw new IllegalArgumentException("No checkpoint has been
provided!");
+ default:
+ throw new UnsupportedOperationException("Unsupported offset
auto-reset strategy");
+ }
+ });
+ }
+
+ private void ackOffset(MessageId latestConsumedOffset) {
+ try {
+ pulsarConsumer.get().acknowledgeCumulative(latestConsumedOffset);
+ } catch (PulsarClientException e) {
+ LOG.error(String.format("Failed to ack messageId (%s) for topic '%s'",
latestConsumedOffset, topicName), e);
+ throw new HoodieIOException("Failed to ack message for topic", e);
+ }
+ }
+
+ private MessageId fetchLatestOffset() {
+ try {
+ return pulsarConsumer.get().getLastMessageId();
+ } catch (PulsarClientException e) {
+ LOG.error(String.format("Failed to fetch latest messageId for topic
'%s'", topicName), e);
+ throw new HoodieIOException("Failed to fetch latest messageId for
topic", e);
+ }
+ }
+
+ private Consumer<byte[]> subscribeToTopic() {
+ try {
+ // NOTE: We're generating unique subscription-id to make sure that
subsequent invocation
+ // of the DS, do not interfere w/ each other
+ String subscriptionId = String.format(HUDI_PULSAR_CONSUMER_ID_FORMAT,
System.currentTimeMillis());
+ return pulsarClient.get()
+ .newConsumer()
+ .topic(topicName)
+ .subscriptionName(subscriptionId)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscribe();
+ } catch (PulsarClientException e) {
+ LOG.error(String.format("Failed to subscribe to Pulsar topic '%s'",
topicName), e);
+ throw new HoodieIOException("Failed to subscribe to Pulsar topic", e);
+ }
+ }
+
+ private PulsarClient initPulsarClient() {
+ try {
+ return PulsarClient.builder()
+ .serviceUrl(serviceEndpointURL)
+ .build();
+ } catch (PulsarClientException e) {
+ LOG.error(String.format("Failed to init Pulsar client connecting to
'%s'", serviceEndpointURL), e);
+ throw new HoodieIOException("Failed to init Pulsar client", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ shutdownPulsarClient(pulsarClient.get());
+ }
+
+ private static Long computeTargetRecordLimit(long sourceLimit,
TypedProperties props) {
+ if (sourceLimit < Long.MAX_VALUE) {
+ return sourceLimit;
+ } else {
+ return
props.getLong(Config.PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP.key(),
+
Config.PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP.defaultValue());
+ }
+ }
+
+ private static String convertToOffsetString(String topic, MessageId
startingOffset) {
+ return JsonUtils.topicOffsets(
+ HoodieConversionUtils.mapAsScalaImmutableMap(
+ Collections.singletonMap(topic, startingOffset)));
+ }
+
+ private static void shutdownPulsarClient(PulsarClient client) throws
PulsarClientException {
+ client.close();
+ // NOTE: Current version of Pulsar's client (in Pulsar Spark Connector
3.1.1.4) is not
+ // shutting down event-loop group properly, so we had to shut it
down manually
+ try {
+ EventLoopGroup eventLoopGroup = ((PulsarClientImpl)
client).eventLoopGroup();
+ if (eventLoopGroup != null) {
+ eventLoopGroup.shutdownGracefully()
+ .await(GRACEFUL_SHUTDOWN_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
+ }
+ } catch (InterruptedException e) {
+ // No-op
+ }
+
+ // NOTE: Pulsar clients initialized by the spark-connector, might be left
not shutdown
+ // properly (see above). To work this around we employ "nuclear"
option of
+ // fetching all Pulsar client threads and interrupting them forcibly
(to make them
+ // shutdown)
+ collectActiveThreads().stream().sequential()
+ .filter(t -> t.getName().startsWith("pulsar-client-io"))
+ .forEach(Thread::interrupt);
+ }
+
+ public static class Config {
+ private static final ConfigProperty<String> PULSAR_SOURCE_TOPIC_NAME =
ConfigProperty
+ .key("hoodie.deltastreamer.source.pulsar.topic")
+ .noDefaultValue()
+ .withDocumentation("Name of the target Pulsar topic to source data
from");
+
+ private static final ConfigProperty<String>
PULSAR_SOURCE_SERVICE_ENDPOINT_URL = ConfigProperty
+ .key("hoodie.deltastreamer.source.pulsar.endpoint.service.url")
+ .defaultValue("pulsar://localhost:6650")
+ .withDocumentation("URL of the target Pulsar endpoint (of the form
'pulsar://host:port'");
+
+ private static final ConfigProperty<String>
PULSAR_SOURCE_ADMIN_ENDPOINT_URL = ConfigProperty
+ .key("hoodie.deltastreamer.source.pulsar.endpoint.admin.url")
+ .defaultValue("http://localhost:8080")
+ .withDocumentation("URL of the target Pulsar endpoint (of the form
'pulsar://host:port'");
+
+ public enum OffsetAutoResetStrategy {
+ LATEST, EARLIEST, FAIL
+ }
+
+ private static final ConfigProperty<OffsetAutoResetStrategy>
PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY = ConfigProperty
+ .key("hoodie.deltastreamer.source.pulsar.offset.autoResetStrategy")
+ .defaultValue(OffsetAutoResetStrategy.LATEST)
+ .withDocumentation("Policy determining how offsets shall be
automatically reset in case there's "
+ + "no checkpoint information present");
+
+ public static final ConfigProperty<Long>
PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP = ConfigProperty
+ .key("hoodie.deltastreamer.source.pulsar.maxRecords")
+ .defaultValue(5_000_000L)
+ .withDocumentation("Max number of records obtained in a single each
batch");
+ }
+}
diff --git a/pom.xml b/pom.xml
index 66a9335937..1f9550218c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,6 +98,9 @@
<kafka.version>2.0.0</kafka.version>
<kafka.spark3.version>2.4.1</kafka.spark3.version>
<pulsar.version>2.8.1</pulsar.version>
+
<pulsar.spark.version>${pulsar.spark.scala11.version}</pulsar.spark.version>
+ <pulsar.spark.scala11.version>2.4.5</pulsar.spark.scala11.version>
+ <pulsar.spark.scala12.version>3.1.1.4</pulsar.spark.scala12.version>
<confluent.version>5.3.4</confluent.version>
<glassfish.version>2.17</glassfish.version>
<glassfish.el.version>3.0.1-b12</glassfish.el.version>
@@ -1602,12 +1605,16 @@
<!-- Exists for backwards compatibility; profile doesn't do anything -->
<profile>
<id>scala-2.11</id>
+ <properties>
+
<pulsar.spark.version>${pulsar.spark.scala11.version}</pulsar.spark.version>
+ </properties>
</profile>
<profile>
<id>scala-2.12</id>
<properties>
<scala.version>${scala12.version}</scala.version>
<scala.binary.version>2.12</scala.binary.version>
+
<pulsar.spark.version>${pulsar.spark.scala12.version}</pulsar.spark.version>
</properties>
<activation>
<property>