tisonkun commented on code in PR #20584:
URL: https://github.com/apache/flink/pull/20584#discussion_r981897034


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/SplitRangeGenerator.java:
##########
@@ -27,27 +28,55 @@
 import java.util.List;
 
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.MAX_RANGE;
-import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.MIN_RANGE;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * This range generator would divide the range by the flink source 
parallelism. It would be the
  * default implementation for {@link SubscriptionType#Key_Shared} subscription.
  */
-public class UniformRangeGenerator implements RangeGenerator {
-    private static final long serialVersionUID = -7292650922683609268L;
+@PublicEvolving
+public class SplitRangeGenerator implements RangeGenerator {
+    private static final long serialVersionUID = -8682286436352905249L;
+
+    private final int start;
+    private final int end;
+
+    public SplitRangeGenerator() {
+        this(MIN_RANGE, MAX_RANGE);
+    }
+
+    public SplitRangeGenerator(int start, int end) {
+        checkArgument(
+                start >= MIN_RANGE,
+                "Start range should be equal to or great than the min range " 
+ MIN_RANGE);
+        checkArgument(
+                end <= MAX_RANGE, "End range should below or less than the max 
range " + MAX_RANGE);
+        checkArgument(start <= end, "Start range should be equal to or less 
than the end range");
+
+        this.start = start;
+        this.end = end;
+    }
 
     @Override
     public List<TopicRange> range(TopicMetadata metadata, int parallelism) {

Review Comment:
   This change is correct for me. However, you'd better to add a test case 
`parallelism = 1` for guarding edge case.



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/SplitRangeGenerator.java:
##########
@@ -27,27 +28,55 @@
 import java.util.List;
 
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.MAX_RANGE;
-import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.MIN_RANGE;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * This range generator would divide the range by the flink source 
parallelism. It would be the
  * default implementation for {@link SubscriptionType#Key_Shared} subscription.
  */
-public class UniformRangeGenerator implements RangeGenerator {
-    private static final long serialVersionUID = -7292650922683609268L;
+@PublicEvolving
+public class SplitRangeGenerator implements RangeGenerator {
+    private static final long serialVersionUID = -8682286436352905249L;
+
+    private final int start;
+    private final int end;
+
+    public SplitRangeGenerator() {
+        this(MIN_RANGE, MAX_RANGE);
+    }
+
+    public SplitRangeGenerator(int start, int end) {

Review Comment:
   `VisibleForTesting`? I guess this constructor is for testing only.



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java:
##########
@@ -18,23 +18,40 @@
 
 package org.apache.flink.connector.pulsar.source.enumerator.topic.range;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 
 import java.util.List;
 
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.validateTopicRanges;
+
 /** Always return the same range set for all topics. */
+@PublicEvolving
 public class FixedRangeGenerator implements RangeGenerator {

Review Comment:
   Unused or no tested?



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/RangeGenerator.java:
##########
@@ -51,7 +51,18 @@ public interface RangeGenerator extends Serializable {
      */
     List<TopicRange> range(TopicMetadata metadata, int parallelism);
 
-    /** Initialize some extra resource when bootstrap the source. */
+    /**
+     * Defines the default behavior for Key_Shared subscription in Flink. See 
{@link KeySharedMode}
+     * for the detailed usage of the key share mode.
+     *
+     * @param metadata The metadata of the topic.
+     * @param parallelism The reader size for this topic.
+     */
+    default KeySharedMode keyShareMode(TopicMetadata metadata, int 
parallelism) {

Review Comment:
   It seems no implementation use the parameters.



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedKeysRangeGenerator.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.topic.range;
+
+import org.apache.flink.annotation.PublicEvolving;
+import 
org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessageBuilder;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.NONE_KEY;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.keyBytesHash;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.keyHash;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.validateTopicRanges;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Pulsar didn't expose the key hash range method. We have to provide an 
implementation for
+ * end-user. You can add the keys you want to consume, no need to provide any 
hash ranges.
+ *
+ * <p>Since the key's hash isn't specified to only one key. The consuming 
results may contain the
+ * messages with different keys comparing the keys you have defined in this 
range generator.
+ * Remember to use flink's <code>DataStream.filter()</code> method.
+ *
+ * <p>Usage: <code><pre>
+ * FixedKeysRangeGenerator.builder()
+ *     .supportNullKey()
+ *     .key("someKey")
+ *     .keys(Arrays.asList("key1", "key2"))
+ *     .build()
+ * </pre></code>
+ */
+@PublicEvolving
+public class FixedKeysRangeGenerator implements RangeGenerator {
+    private static final long serialVersionUID = 2372969466289052100L;
+
+    private final List<TopicRange> ranges;
+    private final KeySharedMode sharedMode;
+
+    private FixedKeysRangeGenerator(List<TopicRange> ranges, KeySharedMode 
sharedMode) {
+        this.ranges = ranges;
+        this.sharedMode = sharedMode;
+    }
+
+    @Override
+    public List<TopicRange> range(TopicMetadata metadata, int parallelism) {
+        return ranges;
+    }
+
+    @Override
+    public KeySharedMode keyShareMode(TopicMetadata metadata, int parallelism) 
{
+        return sharedMode;
+    }
+
+    public static FixedKeysRangeGeneratorBuilder builder() {
+        return new FixedKeysRangeGeneratorBuilder();
+    }
+
+    /** The builder for {@link FixedKeysRangeGenerator}. */
+    @PublicEvolving
+    public static class FixedKeysRangeGeneratorBuilder {
+
+        private final SortedSet<Integer> keyHashes = new TreeSet<>();
+        private KeySharedMode sharedMode = KeySharedMode.JOIN;
+
+        private FixedKeysRangeGeneratorBuilder() {
+            // No public for builder
+        }
+
+        /**
+         * Some {@link Message} in Pulsar may not have {@link 
Message#getOrderingKey()} or {@link
+         * Message#getKey()}, use this method for supporting consuming such 
messages.
+         */
+        public FixedKeysRangeGeneratorBuilder supportNullKey() {
+            keyHashes.add(keyHash(NONE_KEY));
+            return this;
+        }
+
+        /**
+         * If you set the message key by using {@link 
PulsarMessageBuilder#key(String)} or {@link
+         * TypedMessageBuilder#key(String)}, use this method for supporting 
consuming such messages.
+         */
+        public FixedKeysRangeGeneratorBuilder key(String key) {
+            keyHashes.add(keyHash(key));
+            return this;
+        }
+
+        /** Same as the {@link #key(String)}, support setting multiple keys in 
the same time. */
+        public FixedKeysRangeGeneratorBuilder keys(Collection<String> 
someKeys) {
+            checkNotNull(someKeys);
+            for (String someKey : someKeys) {
+                keyHashes.add(keyHash(someKey));
+            }
+            return this;
+        }
+
+        /**
+         * If you set the message key by using {@link 
TypedMessageBuilder#keyBytes(byte[])}, use
+         * this method for supporting consuming such messages.
+         */
+        public FixedKeysRangeGeneratorBuilder keyBytes(byte[] keyBytes) {
+            keyHashes.add(keyBytesHash(keyBytes));
+            return this;
+        }
+
+        /**
+         * Pulsar's ordering key is prior to the message key. If you set the 
ordering key by using
+         * {@link PulsarMessageBuilder#orderingKey(byte[])} or {@link
+         * TypedMessageBuilder#orderingKey(byte[])}, use this method for 
supporting consuming such
+         * messages.
+         */
+        public FixedKeysRangeGeneratorBuilder orderingKey(byte[] keyBytes) {
+            keyHashes.add(keyHash(keyBytes));
+            return this;
+        }
+
+        /** Override the default {@link KeySharedMode#JOIN} to the mode your 
have provided. */
+        public FixedKeysRangeGeneratorBuilder keySharedMode(KeySharedMode 
sharedMode) {
+            this.sharedMode = sharedMode;
+            return this;
+        }

Review Comment:
   For what reason we can use a different mode here? I think for consuming 
fixed key range we should always use `JOIN`...unless the fixed range is full 
range? Then users should use `FullRangeGenerator` or `SplitRangeGenerator` 
instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to