This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new d89a0821 [FLINK-29398] Provide rack ID to KafkaSource to take 
advantage of Rack Awareness
d89a0821 is described below

commit d89a082180232bb79e3c764228c4e7dbb9eb6b8b
Author: Ethan Gouty <[email protected]>
AuthorDate: Wed Feb 8 12:11:40 2023 -0900

    [FLINK-29398] Provide rack ID to KafkaSource to take advantage of Rack 
Awareness
    
    This closes #53.
    This closes #20.
    
    Co-authored-by: Jeremy DeGroot <[email protected]>
    Co-authored-by: jcmejias1 <[email protected]>
    Co-authored-by: Mason Chen <[email protected]>
    Co-authored-by: Ethan Gouty <[email protected]>
    Co-authored-by: Siva Venkat Gogineni <[email protected]>
---
 docs/content/docs/connectors/datastream/kafka.md   | 19 ++++++++
 .../flink/connector/kafka/source/KafkaSource.java  | 17 ++++++-
 .../connector/kafka/source/KafkaSourceBuilder.java | 19 +++++++-
 .../source/reader/KafkaPartitionSplitReader.java   | 23 +++++++++
 .../reader/KafkaPartitionSplitReaderTest.java      | 43 ++++++++++++++++-
 .../kafka/source/reader/KafkaSourceReaderTest.java | 54 ++++++++++++++++++++--
 6 files changed, 168 insertions(+), 7 deletions(-)

diff --git a/docs/content/docs/connectors/datastream/kafka.md 
b/docs/content/docs/connectors/datastream/kafka.md
index acbbbc79..422ed9e3 100644
--- a/docs/content/docs/connectors/datastream/kafka.md
+++ b/docs/content/docs/connectors/datastream/kafka.md
@@ -465,6 +465,25 @@ client dependencies in the job JAR, so you may need to 
rewrite it with the actua
 For detailed explanations of security configurations, please refer to
 <a href="https://kafka.apache.org/documentation/#security";>the "Security" 
section in Apache Kafka documentation</a>.
 
+## Kafka Rack Awareness
+
+Kafka rack awareness allows Flink to select and control the cloud region and 
availability zone that Kafka consumers read from, based on the Rack ID. This 
feature reduces network costs and latency since it allows consumers to connect 
to the closest Kafka brokers, possibly colocated in the same cloud region and 
availability zone.
+A client's rack is indicated using the `client.rack` config, and should 
correspond to a broker's `broker.rack` config.
+
+https://kafka.apache.org/documentation/#consumerconfigs_client.rack
+
+### RackId
+
+setRackIdSupplier() is the Builder method allows us to determine the 
consumer's rack. If provided, the Supplier will be run when the consumer is set 
up on the Task Manager, and the consumer's `client.rack` configuration will be 
set to the value.
+
+One of the ways this can be implemented is by making setRackId equal to an 
environment variable within your taskManager, for instance:
+
+```
+.setRackIdSupplier(() -> System.getenv("TM_NODE_AZ"))
+```
+
+The "TM_NODE_AZ" is the name of the environment variable in the TaskManager 
container that contains the zone we want to use.
+
 ### Behind the Scene
 {{< hint info >}}
 If you are interested in how Kafka source works under the design of new data 
source API, you may
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
index 0e764649..54f5f856 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
@@ -49,6 +49,7 @@ import 
org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializ
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.util.UserCodeClassLoader;
+import org.apache.flink.util.function.SerializableSupplier;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
@@ -56,6 +57,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -98,6 +100,8 @@ public class KafkaSource<OUT>
     private final KafkaRecordDeserializationSchema<OUT> deserializationSchema;
     // The configurations.
     private final Properties props;
+    // Client rackId callback
+    private final SerializableSupplier<String> rackIdSupplier;
 
     KafkaSource(
             KafkaSubscriber subscriber,
@@ -105,13 +109,15 @@ public class KafkaSource<OUT>
             @Nullable OffsetsInitializer stoppingOffsetsInitializer,
             Boundedness boundedness,
             KafkaRecordDeserializationSchema<OUT> deserializationSchema,
-            Properties props) {
+            Properties props,
+            SerializableSupplier<String> rackIdSupplier) {
         this.subscriber = subscriber;
         this.startingOffsetsInitializer = startingOffsetsInitializer;
         this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
         this.boundedness = boundedness;
         this.deserializationSchema = deserializationSchema;
         this.props = props;
+        this.rackIdSupplier = rackIdSupplier;
     }
 
     /**
@@ -157,7 +163,14 @@ public class KafkaSource<OUT>
                 new KafkaSourceReaderMetrics(readerContext.metricGroup());
 
         Supplier<KafkaPartitionSplitReader> splitReaderSupplier =
-                () -> new KafkaPartitionSplitReader(props, readerContext, 
kafkaSourceReaderMetrics);
+                () ->
+                        new KafkaPartitionSplitReader(
+                                props,
+                                readerContext,
+                                kafkaSourceReaderMetrics,
+                                Optional.ofNullable(rackIdSupplier)
+                                        .map(Supplier::get)
+                                        .orElse(null));
         KafkaRecordEmitter<OUT> recordEmitter = new 
KafkaRecordEmitter<>(deserializationSchema);
 
         return new KafkaSourceReader<>(
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
index afaa72db..dcad476b 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsIni
 import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator;
 import 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
 import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.util.function.SerializableSupplier;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.TopicPartition;
@@ -80,6 +81,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
  *     
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
  *     .setUnbounded(OffsetsInitializer.latest())
+ *     .setRackId(() -> MY_RACK_ID)
  *     .build();
  * }</pre>
  *
@@ -100,6 +102,8 @@ public class KafkaSourceBuilder<OUT> {
     private KafkaRecordDeserializationSchema<OUT> deserializationSchema;
     // The configurations.
     protected Properties props;
+    // Client rackId supplier
+    private SerializableSupplier<String> rackIdSupplier;
 
     KafkaSourceBuilder() {
         this.subscriber = null;
@@ -108,6 +112,7 @@ public class KafkaSourceBuilder<OUT> {
         this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
         this.deserializationSchema = null;
         this.props = new Properties();
+        this.rackIdSupplier = null;
     }
 
     /**
@@ -355,6 +360,17 @@ public class KafkaSourceBuilder<OUT> {
         return setProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), prefix);
     }
 
+    /**
+     * Set the clientRackId supplier to be passed down to the 
KafkaPartitionSplitReader.
+     *
+     * @param rackIdCallback callback to provide Kafka consumer client.rack
+     * @return this KafkaSourceBuilder
+     */
+    public KafkaSourceBuilder<OUT> 
setRackIdSupplier(SerializableSupplier<String> rackIdCallback) {
+        this.rackIdSupplier = rackIdCallback;
+        return this;
+    }
+
     /**
      * Set an arbitrary property for the KafkaSource and KafkaConsumer. The 
valid keys can be found
      * in {@link ConsumerConfig} and {@link KafkaSourceOptions}.
@@ -422,7 +438,8 @@ public class KafkaSourceBuilder<OUT> {
                 stoppingOffsetsInitializer,
                 boundedness,
                 deserializationSchema,
-                props);
+                props,
+                rackIdSupplier);
     }
 
     // ------------- private helpers  --------------
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index f52940c4..94940b8e 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -80,11 +80,20 @@ public class KafkaPartitionSplitReader
             Properties props,
             SourceReaderContext context,
             KafkaSourceReaderMetrics kafkaSourceReaderMetrics) {
+        this(props, context, kafkaSourceReaderMetrics, null);
+    }
+
+    public KafkaPartitionSplitReader(
+            Properties props,
+            SourceReaderContext context,
+            KafkaSourceReaderMetrics kafkaSourceReaderMetrics,
+            String rackIdSupplier) {
         this.subtaskId = context.getIndexOfSubtask();
         this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
         Properties consumerProps = new Properties();
         consumerProps.putAll(props);
         consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
createConsumerClientId(props));
+        setConsumerClientRack(consumerProps, rackIdSupplier);
         this.consumer = new KafkaConsumer<>(consumerProps);
         this.stoppingOffsets = new HashMap<>();
         this.groupId = 
consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
@@ -256,6 +265,20 @@ public class KafkaPartitionSplitReader
 
     // --------------- private helper method ----------------------
 
+    /**
+     * This Method performs Null and empty Rack Id validation and sets the 
rack id to the
+     * client.rack Consumer Config.
+     *
+     * @param consumerProps Consumer Property.
+     * @param rackId Rack Id's.
+     */
+    @VisibleForTesting
+    void setConsumerClientRack(Properties consumerProps, String rackId) {
+        if (rackId != null && !rackId.isEmpty()) {
+            consumerProps.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, 
rackId);
+        }
+    }
+
     private void parseStartingOffsets(
             KafkaPartitionSplit split,
             List<TopicPartition> partitionsStartingFromEarliest,
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
index 7263bd02..edd41326 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@@ -48,6 +48,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.EmptySource;
+import org.junit.jupiter.params.provider.NullAndEmptySource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
@@ -319,6 +320,38 @@ public class KafkaPartitionSplitReaderTest {
         
assertThat(reader.consumer().position(partition)).isEqualTo(expectedOffset);
     }
 
+    @Test
+    public void testConsumerClientRackSupplier() {
+        String rackId = "use1-az1";
+        Properties properties = new Properties();
+        KafkaPartitionSplitReader reader =
+                createReader(
+                        properties,
+                        
UnregisteredMetricsGroup.createSourceReaderMetricGroup(),
+                        rackId);
+
+        // Here we call the helper function directly, because the 
KafkaPartitionSplitReader
+        // doesn't allow us to examine the final ConsumerConfig object
+        reader.setConsumerClientRack(properties, rackId);
+        
assertThat(properties.get(ConsumerConfig.CLIENT_RACK_CONFIG)).isEqualTo(rackId);
+    }
+
+    @ParameterizedTest
+    @NullAndEmptySource
+    public void testSetConsumerClientRackIgnoresNullAndEmpty(String rackId) {
+        Properties properties = new Properties();
+        KafkaPartitionSplitReader reader =
+                createReader(
+                        properties,
+                        
UnregisteredMetricsGroup.createSourceReaderMetricGroup(),
+                        rackId);
+
+        // Here we call the helper function directly, because the 
KafkaPartitionSplitReader
+        // doesn't allow us to examine the final ConsumerConfig object
+        reader.setConsumerClientRack(properties, rackId);
+        
assertThat(properties.containsKey(ConsumerConfig.CLIENT_RACK_CONFIG)).isFalse();
+    }
+
     // ------------------
 
     private void assignSplitsAndFetchUntilFinish(KafkaPartitionSplitReader 
reader, int readerId)
@@ -383,6 +416,13 @@ public class KafkaPartitionSplitReaderTest {
 
     private KafkaPartitionSplitReader createReader(
             Properties additionalProperties, SourceReaderMetricGroup 
sourceReaderMetricGroup) {
+        return createReader(additionalProperties, sourceReaderMetricGroup, 
null);
+    }
+
+    private KafkaPartitionSplitReader createReader(
+            Properties additionalProperties,
+            SourceReaderMetricGroup sourceReaderMetricGroup,
+            String rackId) {
         Properties props = new Properties();
         
props.putAll(KafkaSourceTestEnv.getConsumerProperties(ByteArrayDeserializer.class));
         props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
@@ -394,7 +434,8 @@ public class KafkaPartitionSplitReaderTest {
         return new KafkaPartitionSplitReader(
                 props,
                 new TestingReaderContext(new Configuration(), 
sourceReaderMetricGroup),
-                kafkaSourceReaderMetrics);
+                kafkaSourceReaderMetrics,
+                rackId);
     }
 
     private Map<String, KafkaPartitionSplit> assignSplits(
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index e4ee39d2..7f879f21 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.testutils.MetricListener;
 import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
+import org.apache.flink.util.function.SerializableSupplier;
 
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
@@ -53,6 +54,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -79,6 +81,8 @@ import static 
org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderM
 import static 
org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv.NUM_PARTITIONS;
 import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 
 /** Unit tests for {@link KafkaSourceReader}. */
 public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSplit> {
@@ -271,7 +275,8 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
                                 Boundedness.CONTINUOUS_UNBOUNDED,
                                 new TestingReaderContext(),
                                 (ignore) -> {},
-                                properties)) {
+                                properties,
+                                null)) {
             reader.addSplits(
                     getSplits(numSplits, NUM_RECORDS_PER_SPLIT, 
Boundedness.CONTINUOUS_UNBOUNDED));
             ValidatingSourceOutput output = new ValidatingSourceOutput();
@@ -479,6 +484,45 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
         }
     }
 
+    @Test
+    public void testThatReaderDoesNotCallRackIdSupplierOnInit() throws 
Exception {
+        SerializableSupplier<String> rackIdSupplier = 
Mockito.mock(SerializableSupplier.class);
+
+        try (KafkaSourceReader<Integer> reader =
+                (KafkaSourceReader<Integer>)
+                        createReader(
+                                Boundedness.CONTINUOUS_UNBOUNDED,
+                                new TestingReaderContext(),
+                                (ignore) -> {},
+                                new Properties(),
+                                rackIdSupplier)) {
+            // Do nothing here
+        }
+
+        verify(rackIdSupplier, never()).get();
+    }
+
+    @Test
+    public void testThatReaderDoesCallRackIdSupplierOnSplitAssignment() throws 
Exception {
+        SerializableSupplier<String> rackIdSupplier = 
Mockito.mock(SerializableSupplier.class);
+        Mockito.when(rackIdSupplier.get()).thenReturn("use1-az1");
+
+        try (KafkaSourceReader<Integer> reader =
+                (KafkaSourceReader<Integer>)
+                        createReader(
+                                Boundedness.CONTINUOUS_UNBOUNDED,
+                                new TestingReaderContext(),
+                                (ignore) -> {},
+                                new Properties(),
+                                rackIdSupplier)) {
+            reader.addSplits(
+                    Collections.singletonList(
+                            new KafkaPartitionSplit(new TopicPartition(TOPIC, 
1), 1L)));
+        }
+
+        verify(rackIdSupplier).get();
+    }
+
     // ------------------------------------------
 
     @Override
@@ -535,14 +579,15 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
             throws Exception {
         Properties properties = new Properties();
         properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        return createReader(boundedness, context, splitFinishedHook, 
properties);
+        return createReader(boundedness, context, splitFinishedHook, 
properties, null);
     }
 
     private SourceReader<Integer, KafkaPartitionSplit> createReader(
             Boundedness boundedness,
             SourceReaderContext context,
             Consumer<Collection<String>> splitFinishedHook,
-            Properties props)
+            Properties props,
+            SerializableSupplier<String> rackIdSupplier)
             throws Exception {
         KafkaSourceBuilder<Integer> builder =
                 KafkaSource.<Integer>builder()
@@ -559,6 +604,9 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
         if (boundedness == Boundedness.BOUNDED) {
             builder.setBounded(OffsetsInitializer.latest());
         }
+        if (rackIdSupplier != null) {
+            builder.setRackIdSupplier(rackIdSupplier);
+        }
 
         return KafkaSourceTestUtils.createReaderWithFinishedSplitHook(
                 builder.build(), context, splitFinishedHook);

Reply via email to