tisonkun commented on code in PR #20584:
URL: https://github.com/apache/flink/pull/20584#discussion_r981897034
##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/SplitRangeGenerator.java:
##########
@@ -27,27 +28,55 @@
import java.util.List;
import static
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.MAX_RANGE;
-import static
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
+import static
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.MIN_RANGE;
+import static org.apache.flink.util.Preconditions.checkArgument;
/**
* This range generator would divide the range by the flink source
parallelism. It would be the
* default implementation for {@link SubscriptionType#Key_Shared} subscription.
*/
-public class UniformRangeGenerator implements RangeGenerator {
- private static final long serialVersionUID = -7292650922683609268L;
+@PublicEvolving
+public class SplitRangeGenerator implements RangeGenerator {
+ private static final long serialVersionUID = -8682286436352905249L;
+
+ private final int start;
+ private final int end;
+
+ public SplitRangeGenerator() {
+ this(MIN_RANGE, MAX_RANGE);
+ }
+
+ public SplitRangeGenerator(int start, int end) {
+ checkArgument(
+ start >= MIN_RANGE,
+ "Start range should be equal to or great than the min range "
+ MIN_RANGE);
+ checkArgument(
+ end <= MAX_RANGE, "End range should below or less than the max
range " + MAX_RANGE);
+ checkArgument(start <= end, "Start range should be equal to or less
than the end range");
+
+ this.start = start;
+ this.end = end;
+ }
@Override
public List<TopicRange> range(TopicMetadata metadata, int parallelism) {
Review Comment:
This change is correct for me. However, you'd better to add a test case
`parallelism = 1` for guarding edge case.
##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/SplitRangeGenerator.java:
##########
@@ -27,27 +28,55 @@
import java.util.List;
import static
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.MAX_RANGE;
-import static
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
+import static
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.MIN_RANGE;
+import static org.apache.flink.util.Preconditions.checkArgument;
/**
* This range generator would divide the range by the flink source
parallelism. It would be the
* default implementation for {@link SubscriptionType#Key_Shared} subscription.
*/
-public class UniformRangeGenerator implements RangeGenerator {
- private static final long serialVersionUID = -7292650922683609268L;
+@PublicEvolving
+public class SplitRangeGenerator implements RangeGenerator {
+ private static final long serialVersionUID = -8682286436352905249L;
+
+ private final int start;
+ private final int end;
+
+ public SplitRangeGenerator() {
+ this(MIN_RANGE, MAX_RANGE);
+ }
+
+ public SplitRangeGenerator(int start, int end) {
Review Comment:
`VisibleForTesting`? I guess this constructor is for testing only.
##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java:
##########
@@ -18,23 +18,40 @@
package org.apache.flink.connector.pulsar.source.enumerator.topic.range;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
import java.util.List;
+import static
org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.validateTopicRanges;
+
/** Always return the same range set for all topics. */
+@PublicEvolving
public class FixedRangeGenerator implements RangeGenerator {
Review Comment:
Unused or no tested?
##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/RangeGenerator.java:
##########
@@ -51,7 +51,18 @@ public interface RangeGenerator extends Serializable {
*/
List<TopicRange> range(TopicMetadata metadata, int parallelism);
- /** Initialize some extra resource when bootstrap the source. */
+ /**
+ * Defines the default behavior for Key_Shared subscription in Flink. See
{@link KeySharedMode}
+ * for the detailed usage of the key share mode.
+ *
+ * @param metadata The metadata of the topic.
+ * @param parallelism The reader size for this topic.
+ */
+ default KeySharedMode keyShareMode(TopicMetadata metadata, int
parallelism) {
Review Comment:
It seems no implementation use the parameters.
##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedKeysRangeGenerator.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.topic.range;
+
+import org.apache.flink.annotation.PublicEvolving;
+import
org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessageBuilder;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static
org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.NONE_KEY;
+import static
org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.keyBytesHash;
+import static
org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.keyHash;
+import static
org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.validateTopicRanges;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Pulsar didn't expose the key hash range method. We have to provide an
implementation for
+ * end-user. You can add the keys you want to consume, no need to provide any
hash ranges.
+ *
+ * <p>Since the key's hash isn't specified to only one key. The consuming
results may contain the
+ * messages with different keys comparing the keys you have defined in this
range generator.
+ * Remember to use flink's <code>DataStream.filter()</code> method.
+ *
+ * <p>Usage: <code><pre>
+ * FixedKeysRangeGenerator.builder()
+ * .supportNullKey()
+ * .key("someKey")
+ * .keys(Arrays.asList("key1", "key2"))
+ * .build()
+ * </pre></code>
+ */
+@PublicEvolving
+public class FixedKeysRangeGenerator implements RangeGenerator {
+ private static final long serialVersionUID = 2372969466289052100L;
+
+ private final List<TopicRange> ranges;
+ private final KeySharedMode sharedMode;
+
+ private FixedKeysRangeGenerator(List<TopicRange> ranges, KeySharedMode
sharedMode) {
+ this.ranges = ranges;
+ this.sharedMode = sharedMode;
+ }
+
+ @Override
+ public List<TopicRange> range(TopicMetadata metadata, int parallelism) {
+ return ranges;
+ }
+
+ @Override
+ public KeySharedMode keyShareMode(TopicMetadata metadata, int parallelism)
{
+ return sharedMode;
+ }
+
+ public static FixedKeysRangeGeneratorBuilder builder() {
+ return new FixedKeysRangeGeneratorBuilder();
+ }
+
+ /** The builder for {@link FixedKeysRangeGenerator}. */
+ @PublicEvolving
+ public static class FixedKeysRangeGeneratorBuilder {
+
+ private final SortedSet<Integer> keyHashes = new TreeSet<>();
+ private KeySharedMode sharedMode = KeySharedMode.JOIN;
+
+ private FixedKeysRangeGeneratorBuilder() {
+ // No public for builder
+ }
+
+ /**
+ * Some {@link Message} in Pulsar may not have {@link
Message#getOrderingKey()} or {@link
+ * Message#getKey()}, use this method for supporting consuming such
messages.
+ */
+ public FixedKeysRangeGeneratorBuilder supportNullKey() {
+ keyHashes.add(keyHash(NONE_KEY));
+ return this;
+ }
+
+ /**
+ * If you set the message key by using {@link
PulsarMessageBuilder#key(String)} or {@link
+ * TypedMessageBuilder#key(String)}, use this method for supporting
consuming such messages.
+ */
+ public FixedKeysRangeGeneratorBuilder key(String key) {
+ keyHashes.add(keyHash(key));
+ return this;
+ }
+
+ /** Same as the {@link #key(String)}, support setting multiple keys in
the same time. */
+ public FixedKeysRangeGeneratorBuilder keys(Collection<String>
someKeys) {
+ checkNotNull(someKeys);
+ for (String someKey : someKeys) {
+ keyHashes.add(keyHash(someKey));
+ }
+ return this;
+ }
+
+ /**
+ * If you set the message key by using {@link
TypedMessageBuilder#keyBytes(byte[])}, use
+ * this method for supporting consuming such messages.
+ */
+ public FixedKeysRangeGeneratorBuilder keyBytes(byte[] keyBytes) {
+ keyHashes.add(keyBytesHash(keyBytes));
+ return this;
+ }
+
+ /**
+ * Pulsar's ordering key is prior to the message key. If you set the
ordering key by using
+ * {@link PulsarMessageBuilder#orderingKey(byte[])} or {@link
+ * TypedMessageBuilder#orderingKey(byte[])}, use this method for
supporting consuming such
+ * messages.
+ */
+ public FixedKeysRangeGeneratorBuilder orderingKey(byte[] keyBytes) {
+ keyHashes.add(keyHash(keyBytes));
+ return this;
+ }
+
+ /** Override the default {@link KeySharedMode#JOIN} to the mode your
have provided. */
+ public FixedKeysRangeGeneratorBuilder keySharedMode(KeySharedMode
sharedMode) {
+ this.sharedMode = sharedMode;
+ return this;
+ }
Review Comment:
For what reason we can use a different mode here? I think for consuming
fixed key range we should always use `JOIN`...unless the fixed range is full
range? Then users should use `FullRangeGenerator` or `SplitRangeGenerator`
instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]