AHeise commented on a change in pull request #15304:
URL: https://github.com/apache/flink/pull/15304#discussion_r686797785



##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.split;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link SourceSplit} implementation for a Pulsar's partition. */
+@Internal
+public class PulsarPartitionSplit implements SourceSplit {
+
+    private final TopicPartition partition;
+
+    private final StartCursor startCursor;
+
+    private final StopCursor stopCursor;
+
+    @Nullable private final MessageId latestConsumedId;

Review comment:
       `transient`. We don't need to serialize it; it's just used in 
`PulsarOrderedSourceReader#snapshotState`.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentState.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** The state class for recording the split assignment. */
+@Internal
+public class SplitsAssignmentState implements Serializable {
+    private static final long serialVersionUID = -3244274150389546170L;
+
+    private final SerializableSupplier<StartCursor> startCursorSupplier;
+    private final SerializableSupplier<StopCursor> stopCursorSupplier;
+    private final SourceConfiguration sourceConfiguration;
+
+    // The dynamic states for checkpoint.
+
+    private final Set<TopicPartition> appendedPartitions;
+    private final Set<PulsarPartitionSplit> pendingPartitionSplits;
+    private final Map<String, Set<Integer>> splitReaderMapping;
+    private boolean initialized;
+
+    public SplitsAssignmentState(
+            SerializableSupplier<StartCursor> startCursorSupplier,
+            SerializableSupplier<StopCursor> stopCursorSupplier,
+            SourceConfiguration sourceConfiguration) {
+        this.startCursorSupplier = startCursorSupplier;
+        this.stopCursorSupplier = stopCursorSupplier;
+        this.sourceConfiguration = sourceConfiguration;
+        this.appendedPartitions = new HashSet<>();
+        this.pendingPartitionSplits = new HashSet<>();
+        this.splitReaderMapping = new HashMap<>();
+        this.initialized = false;
+    }
+
+    public SplitsAssignmentState(
+            SerializableSupplier<StartCursor> startCursorSupplier,
+            SerializableSupplier<StopCursor> stopCursorSupplier,
+            SourceConfiguration sourceConfiguration,
+            PulsarSourceEnumState sourceEnumState) {
+        this.startCursorSupplier = startCursorSupplier;
+        this.stopCursorSupplier = stopCursorSupplier;
+        this.sourceConfiguration = sourceConfiguration;
+        this.appendedPartitions = sourceEnumState.getAppendedPartitions();
+        this.pendingPartitionSplits = 
sourceEnumState.getPendingPartitionSplits();
+        this.splitReaderMapping = sourceEnumState.getSplitReaderMapping();
+        this.initialized = sourceEnumState.isInitialized();
+    }
+
+    public PulsarSourceEnumState snapshotState() {
+        return new PulsarSourceEnumState(
+                appendedPartitions, pendingPartitionSplits, 
splitReaderMapping, initialized);
+    }
+
+    /**
+     * Append the new fetched partitions to current state. We would generate 
pending source split
+     * for downstream pulsar readers. Since the {@link SplitEnumeratorContext} 
don't support put the
+     * split back to enumerator, we don't support partition deletion.
+     *
+     * @param fetchedPartitions The partitions from the {@link 
PulsarSubscriber}.
+     */
+    public void appendTopicPartitions(Set<TopicPartition> fetchedPartitions) {
+        for (TopicPartition fetchedPartition : fetchedPartitions) {
+            if (!appendedPartitions.contains(fetchedPartition)) {
+                // This is a new topic partition.
+                PulsarPartitionSplit split =
+                        new PulsarPartitionSplit(
+                                fetchedPartition,
+                                startCursorSupplier.get(),

Review comment:
       To avoid the supplier thingy, we can use 
`SerializationUtils.clone(startCursor)` here.
   
   Same for stopCursor.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplitSerializer.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.split;
+
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarSerdeUtils.deserializeBytes;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarSerdeUtils.deserializeObject;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarSerdeUtils.serializeBytes;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarSerdeUtils.serializeObject;
+
+/** The {@link SimpleVersionedSerializer serializer} for {@link 
PulsarPartitionSplit}. */
+public class PulsarPartitionSplitSerializer
+        implements SimpleVersionedSerializer<PulsarPartitionSplit> {
+
+    public static final PulsarPartitionSplitSerializer INSTANCE =
+            new PulsarPartitionSplitSerializer();
+
+    // This version should be bumped after modifying the PulsarPartitionSplit.
+    public static final int CURRENT_VERSION = 0;
+
+    private PulsarPartitionSplitSerializer() {
+        // Singleton instance.
+    }
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(PulsarPartitionSplit obj) throws IOException {
+        // VERSION 0 serialization
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(baos)) {
+            serializePulsarPartitionSplit(out, obj);
+            out.flush();
+            return baos.toByteArray();
+        }
+    }
+
+    @Override
+    public PulsarPartitionSplit deserialize(int version, byte[] serialized) 
throws IOException {
+        // VERSION 0 deserialization
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+                DataInputStream in = new DataInputStream(bais)) {
+            return deserializePulsarPartitionSplit(version, in);
+        }
+    }
+
+    // ----------------- helper methods --------------
+
+    public void serializePulsarPartitionSplit(DataOutputStream out, 
PulsarPartitionSplit split)
+            throws IOException {
+        // partition
+        serializeTopicPartition(out, split.getPartition());
+
+        // startCursor
+        serializeObject(out, split.getStartCursor());
+
+        // stopCursor
+        serializeObject(out, split.getStopCursor());
+
+        // latestConsumedId
+        MessageId latestConsumedId = split.getLatestConsumedId();
+        if (latestConsumedId == null) {
+            out.writeBoolean(false);
+        } else {
+            out.writeBoolean(true);
+            byte[] bytes = latestConsumedId.toByteArray();
+            serializeBytes(out, bytes);
+        }

Review comment:
       We don't need to serialize that anymore.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to