syhily commented on a change in pull request #15304:
URL: https://github.com/apache/flink/pull/15304#discussion_r632034934



##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.source.util.PulsarAdminUtils;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The @builder class for {@link PulsarSource} to make it easier for the users 
to construct a {@link
+ * PulsarSource}.
+ */
+@PublicEvolving
+public class PulsarSourceBuilder<OUT> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarSourceBuilder.class);
+    // The subscriber specifies the partitions to subscribe to.
+    private PulsarSubscriber subscriber;
+    // Users can specify the starting / stopping offset initializer.
+    private StartOffsetInitializer startOffsetInitializer = 
StartOffsetInitializer.earliest();
+    private StopCondition stopCondition = StopCondition.never();
+    // Boundedness
+    private Boundedness boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+    private MessageDeserializer<OUT> messageDeserializer;
+    private SplitSchedulingStrategy splitSchedulingStrategy;
+    // The configurations.
+    private Configuration configuration = new Configuration();
+
+    private ClientConfigurationData clientConfigurationData = new 
ClientConfigurationData();
+    private ConsumerConfigurationData<byte[]> consumerConfigurationData =
+            new ConsumerConfigurationData<>();
+
+    PulsarSourceBuilder() {
+        
consumerConfigurationData.setSubscriptionMode(SubscriptionMode.NonDurable);
+        
consumerConfigurationData.setSubscriptionType(SubscriptionType.Exclusive);
+        consumerConfigurationData.setSubscriptionName("flink-" + 
UUID.randomUUID());
+    }
+
+    public PulsarSourceBuilder<OUT> setTopics(
+            SplitDivisionStrategy splitDivisionStrategy, String... topics) {
+        TreeSet<String> topicNames = Sets.newTreeSet();
+        List<String> collect = 
Arrays.stream(topics).collect(Collectors.toList());
+        for (String topic : collect) {
+            topicNames.add(topic);
+        }
+        consumerConfigurationData.setTopicNames(topicNames);
+        return setSubscriber(
+                PulsarSubscriber.getTopicListSubscriber(splitDivisionStrategy, 
topics));

Review comment:
       Yeah, I think this is an abstraction leak. Pulsar connector use a 
`TopicListSubscriber` like Kafka connector. But putting topics into consumer 
configuration is just use pulsar client self consuming ability. This should be 
changed and thanks for your detailed review.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/util/CachedPulsarClient.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.util;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.shade.com.google.common.cache.CacheBuilder;
+import org.apache.pulsar.shade.com.google.common.cache.CacheLoader;
+import org.apache.pulsar.shade.com.google.common.cache.LoadingCache;
+import org.apache.pulsar.shade.com.google.common.cache.RemovalListener;
+import org.slf4j.Logger;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+
+/** Enable the sharing of same PulsarClient among tasks in a same process. */
+public class CachedPulsarClient {
+
+    private static final Logger log = 
org.slf4j.LoggerFactory.getLogger(CachedPulsarClient.class);
+
+    private static int cacheSize = 100;
+
+    public static void setCacheSize(int newSize) {
+        cacheSize = newSize;
+    }
+
+    public static int getCacheSize() {
+        return cacheSize;
+    }

Review comment:
       The cache size could be optimized by user.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Configurations for PulsarSource. */
+public class PulsarSourceOptions {
+
+    public static final ConfigOption<String> ADMIN_URL =
+            ConfigOptions.key("admin.url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The url to Pulsar admin.");
+
+    public static final ConfigOption<Long> PARTITION_DISCOVERY_INTERVAL_MS =
+            ConfigOptions.key("partition.discovery.interval.ms")
+                    .longType()
+                    .defaultValue(30000L)

Review comment:
       Yeah, it was used in split.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/AbstractPartition.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source;
+
+import java.io.Serializable;
+
+/** The abstraction of partition, not necessarily the topical partition in 
pulsar. */
+public abstract class AbstractPartition implements Serializable {
+    protected final String topic;
+    protected final PartitionType partitionType;
+
+    public AbstractPartition(String topic, PartitionType partitionType) {
+        this.topic = topic;
+        this.partitionType = partitionType;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public PartitionType getPartitionType() {
+        return partitionType;
+    }
+
+    /** Represents partition type in pulsar. */
+    public enum PartitionType {
+        Broker,
+        Bookie,
+        Offload

Review comment:
       Yep, I will remove it in this PR. This enum class is preserved for 
pulsar's data storage design, and we would provide a better batch reading 
policy by using this configuration.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.source.AbstractPartition;
+import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
+import org.apache.flink.connector.pulsar.source.PulsarSubscriber;
+import org.apache.flink.connector.pulsar.source.SplitSchedulingStrategy;
+import org.apache.flink.connector.pulsar.source.StartOffsetInitializer;
+import org.apache.flink.connector.pulsar.source.StopCondition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.connector.pulsar.source.util.ComponentClosingUtils.closeWithTimeout;
+
+/** The enumerator class for pulsar source. */
+public class PulsarSourceEnumerator
+        implements SplitEnumerator<PulsarPartitionSplit, 
PulsarSourceEnumeratorState> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarSourceEnumerator.class);
+    private final PulsarSubscriber subscriber;
+    private final StartOffsetInitializer startOffsetInitializer;
+    private final StopCondition stopCondition;
+    private final PulsarAdmin pulsarAdmin;
+    private final Configuration configuration;
+    private final long partitionDiscoveryIntervalMs;
+    private final SplitEnumeratorContext<PulsarPartitionSplit> context;
+
+    // The internal states of the enumerator.
+    /**
+     * This set is only accessed by the partition discovery callable in the 
callAsync() method, i.e
+     * worker thread.
+     */
+    private final Set<AbstractPartition> discoveredPartitions;
+    /** The current assignment by reader id. Only accessed by the coordinator 
thread. */
+    private final Map<Integer, List<PulsarPartitionSplit>> 
readerIdToSplitAssignments;
+    /**
+     * The discovered and initialized partition splits that are waiting for 
owner reader to be
+     * ready.
+     */
+    private final Map<Integer, List<PulsarPartitionSplit>> 
pendingPartitionSplitAssignment;
+
+    // Lazily instantiated or mutable fields.
+    private boolean noMoreNewPartitionSplits = false;
+
+    private SplitSchedulingStrategy splitSchedulingStrategy;
+
+    public PulsarSourceEnumerator(
+            PulsarSubscriber subscriber,
+            StartOffsetInitializer startOffsetInitializer,
+            StopCondition stopCondition,
+            PulsarAdmin pulsarAdmin,
+            Configuration configuration,
+            SplitEnumeratorContext<PulsarPartitionSplit> context,
+            Map<Integer, List<PulsarPartitionSplit>> currentSplitsAssignments,
+            SplitSchedulingStrategy splitSchedulingStrategy) {
+        this.subscriber = subscriber;
+        this.subscriber.setContext(context);
+        this.startOffsetInitializer = startOffsetInitializer;
+        this.stopCondition = stopCondition;
+        this.pulsarAdmin = pulsarAdmin;
+        this.configuration = configuration;
+        this.context = context;
+        this.splitSchedulingStrategy = splitSchedulingStrategy;
+        discoveredPartitions = new HashSet<>();
+        readerIdToSplitAssignments = new HashMap<>(currentSplitsAssignments);
+        readerIdToSplitAssignments.forEach(
+                (reader, splits) ->
+                        splits.forEach(s -> 
discoveredPartitions.add(s.getPartition())));
+        pendingPartitionSplitAssignment = new HashMap<>();
+        partitionDiscoveryIntervalMs =
+                
configuration.get(PulsarSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS);
+    }
+
+    @Override
+    public void start() {
+        if (partitionDiscoveryIntervalMs > 0) {
+            context.callAsync(
+                    this::discoverAndInitializePartitionSplit,
+                    this::handlePartitionSplitChanges,
+                    0,
+                    partitionDiscoveryIntervalMs);
+        } else {
+            context.callAsync(
+                    this::discoverAndInitializePartitionSplit, 
this::handlePartitionSplitChanges);
+        }
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+        // the pulsar source pushes splits eagerly, rather than act upon split 
requests
+    }
+
+    @Override
+    public void addSplitsBack(List<PulsarPartitionSplit> splits, int 
subtaskId) {
+        splitSchedulingStrategy.addSplitsBack(
+                pendingPartitionSplitAssignment, splits, subtaskId, 
context.currentParallelism());
+        assignPendingPartitionSplits();
+    }
+
+    // id -> 5
+    @Override
+    public void addReader(int subtaskId) {
+        LOG.debug("Adding reader {}.", subtaskId);
+        assignPendingPartitionSplits();
+    }
+
+    @Override
+    public PulsarSourceEnumeratorState snapshotState() throws Exception {
+        return new PulsarSourceEnumeratorState(readerIdToSplitAssignments);
+    }
+
+    @Override
+    public void close() {
+        // When close the split enumerator, we need to make sure that the 
async calls are canceled
+        // before we can close the consumer and admin clients.
+        long closeTimeoutMs = 
configuration.get(PulsarSourceOptions.CLOSE_TIMEOUT_MS);
+        close(closeTimeoutMs)
+                .ifPresent(
+                        t -> LOG.warn("Encountered error when closing 
PulsarSourceEnumerator", t));
+    }
+
+    @VisibleForTesting
+    Optional<Throwable> close(long timeoutMs) {
+        return closeWithTimeout(
+                "PulsarSourceEnumerator",
+                (ThrowingRunnable<Exception>) () -> pulsarAdmin.close(),
+                timeoutMs);
+    }
+
+    // ----------------- private methods -------------------
+
+    private PartitionSplitChange discoverAndInitializePartitionSplit()
+            throws IOException, InterruptedException, PulsarAdminException {
+        // Make a copy of the partitions to owners
+        PulsarSubscriber.PartitionChange partitionChange =
+                subscriber.getPartitionChanges(
+                        pulsarAdmin, 
Collections.unmodifiableSet(discoveredPartitions));
+
+        discoveredPartitions.addAll(partitionChange.getNewPartitions());
+
+        List<PulsarPartitionSplit> partitionSplits =
+                partitionChange.getNewPartitions().stream()
+                        .map(
+                                partition ->
+                                        new PulsarPartitionSplit(
+                                                partition, 
startOffsetInitializer, stopCondition))
+                        .collect(Collectors.toList());
+        return new PartitionSplitChange(partitionSplits, 
partitionChange.getRemovedPartitions());
+    }
+
+    // This method should only be invoked in the coordinator executor thread.
+    private void handlePartitionSplitChanges(
+            PartitionSplitChange partitionSplitChange, Throwable t) {
+        if (t != null) {
+            throw new FlinkRuntimeException("Failed to handle partition splits 
change due to ", t);
+        }
+        if (partitionDiscoveryIntervalMs < 0) {
+            noMoreNewPartitionSplits = true;
+        }
+        // TODO: Handle removed partitions.
+        
addPartitionSplitChangeToPendingAssignments(partitionSplitChange.newPartitionSplits);
+        assignPendingPartitionSplits();
+    }
+
+    // This method should only be invoked in the coordinator executor thread.
+    private void addPartitionSplitChangeToPendingAssignments(
+            Collection<PulsarPartitionSplit> newPartitionSplits) {
+        int numReaders = context.currentParallelism();
+        for (PulsarPartitionSplit split : newPartitionSplits) {
+            // TODO: Implement a more sophisticated algorithm to reduce 
partition movement when
+            // parallelism changes.
+            pendingPartitionSplitAssignment
+                    .computeIfAbsent(
+                            
splitSchedulingStrategy.getIndexOfReader(numReaders, split),
+                            r -> new ArrayList<>())
+                    .add(split);
+        }
+        LOG.debug("Assigned {} to {} readers.", newPartitionSplits, 
numReaders);
+    }
+
+    // This method should only be invoked in the coordinator executor thread.
+    private void assignPendingPartitionSplits() {
+        Map<Integer, List<PulsarPartitionSplit>> incrementalAssignment = new 
HashMap<>();
+        pendingPartitionSplitAssignment.forEach(
+                (ownerReader, pendingSplits) -> {
+                    if (!pendingSplits.isEmpty()
+                            && 
context.registeredReaders().containsKey(ownerReader)) {
+                        // The owner reader is ready, assign the split to the 
owner reader.
+                        incrementalAssignment
+                                .computeIfAbsent(ownerReader, r -> new 
ArrayList<>())
+                                .addAll(pendingSplits);
+                    }
+                });
+        if (incrementalAssignment.isEmpty()) {
+            // No assignment is made.
+            return;
+        }
+        context.assignSplits(new SplitsAssignment<>(incrementalAssignment));
+        incrementalAssignment.forEach(
+                (readerOwner, newPartitionSplits) -> {
+                    // Update the split assignment.
+                    readerIdToSplitAssignments
+                            .computeIfAbsent(readerOwner, r -> new 
ArrayList<>())
+                            .addAll(newPartitionSplits);
+                    // Clear the pending splits for the reader owner.
+                    pendingPartitionSplitAssignment.remove(readerOwner);
+                    // Sends NoMoreSplitsEvent to the readers if there is no 
more partition splits
+                    // to be assigned.
+                    if (noMoreNewPartitionSplits) {
+                        context.signalNoMoreSplits(readerOwner);
+                    }
+                });
+    }
+
+    /** class that represents partitionSplit's change. */
+    public static class PartitionSplitChange {
+        private final List<PulsarPartitionSplit> newPartitionSplits;
+        private final Set<AbstractPartition> removedPartitions;

Review comment:
       It's preserved for remove function.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to