This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7797e93  [INLONG-2046][Bug]inlong-sort could not identify tube master 
address when inlong-sort and tube-master were deployed in the same machine 
(#2047)
7797e93 is described below

commit 7797e933e257be47286eec4349602c99c487fea5
Author: gosonzhang <[email protected]>
AuthorDate: Tue Dec 21 19:41:03 2021 +0800

    [INLONG-2046][Bug]inlong-sort could not identify tube master address when 
inlong-sort and tube-master were deployed in the same machine (#2047)
---
 .../flink/tubemq/MultiTenancyTubeConsumer.java     | 18 ++++-----------
 .../connectors/tubemq/TubemqSourceFunction.java    | 26 +++++-----------------
 2 files changed, 10 insertions(+), 34 deletions(-)

diff --git 
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
 
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
index 9601541..20cdd26 100644
--- 
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
+++ 
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
@@ -20,8 +20,6 @@ package org.apache.inlong.sort.flink.tubemq;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -36,7 +34,6 @@ import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.flink.runtime.net.ConnectionUtils;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.util.TimeUtils;
 import org.apache.inlong.sort.configuration.Configuration;
@@ -46,6 +43,7 @@ import 
org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
 import org.apache.inlong.sort.meta.MetaManager;
 import org.apache.inlong.sort.util.CommonUtils;
 import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
 import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
 import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
 import org.apache.inlong.tubemq.client.exception.TubeClientException;
@@ -195,14 +193,6 @@ public class MultiTenancyTubeConsumer {
             LOG.warn("Invalid master address {} provided of {}", 
tubeMasterAddresses, topic);
             return;
         }
-        final Pair<String, Integer> firstTubeMasterAddress = 
parsedTubeMasterAddress.get(0);
-        // get local address for connecting
-        InetSocketAddress firstSocketAddress = new 
InetSocketAddress(firstTubeMasterAddress.getLeft(),
-                firstTubeMasterAddress.getRight());
-        InetAddress localAddress = ConnectionUtils.findConnectingAddress(
-                firstSocketAddress, 2000L, 400L);
-        String localhost = localAddress.getHostAddress();
-
         final Map<String, Long> partitionToOffset;
         synchronized (context.getCheckpointLock()) {
             if (topicToOffset.containsKey(topic)) {
@@ -214,13 +204,13 @@ public class MultiTenancyTubeConsumer {
         }
 
         // initialize ConsumerConfig
-        ConsumerConfig consumerConfig = new ConsumerConfig(localhost, 
tubeMasterAddresses, consumerGroup);
+        ConsumerConfig consumerConfig = new 
ConsumerConfig(tubeMasterAddresses, consumerGroup);
         // consumeFromMax works only if there is no offset restored from 
checkpoint
         if (consumeFromMax && partitionToOffset.isEmpty()) {
-            consumerConfig.setConsumeModel(1);
+            
consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS);
             LOG.info("Would consume {} from max offset", topic);
         } else {
-            consumerConfig.setConsumeModel(0);
+            
consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
         }
         
consumerConfig.setMsgNotFoundWaitPeriodMs(messageNotFoundWaitPeriod.toMillis());
 
diff --git 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
index f64e581..368af08 100644
--- 
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
+++ 
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
@@ -20,11 +20,8 @@ package org.apache.flink.connectors.tubemq;
 
 import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
 import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
-import static 
org.apache.flink.runtime.net.ConnectionUtils.findConnectingAddress;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.TimeUtils.parseDuration;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
@@ -45,6 +42,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
 import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
 import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
 import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
@@ -210,24 +208,12 @@ public class TubemqSourceFunction<T>
 
     @Override
     public void open(Configuration parameters) throws Exception {
-
-        String firstAddress = masterAddress.split(SPLIT_COMMA)[0];
-        String[] firstAddressSegments = firstAddress.split(SPLIT_COLON);
-        String firstHost = firstAddressSegments[0];
-        int firstPort = Integer.parseInt(firstAddressSegments[1]);
-        InetSocketAddress firstSocketAddress =
-            new InetSocketAddress(firstHost, firstPort);
-
-        InetAddress localAddress =
-            findConnectingAddress(firstSocketAddress, 2000, 400);
-        String localhost = localAddress.getHostAddress();
-
         ConsumerConfig consumerConfig =
-            new ConsumerConfig(localhost, masterAddress, consumerGroup);
-        consumerConfig
-            .setConsumeModel(consumeFromMax ? 1 : 0);
-        consumerConfig
-            .setMsgNotFoundWaitPeriodMs(messageNotFoundWaitPeriod.toMillis());
+            new ConsumerConfig(masterAddress, consumerGroup);
+        consumerConfig.setConsumePosition(consumeFromMax
+                ? ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS
+                : ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+        
consumerConfig.setMsgNotFoundWaitPeriodMs(messageNotFoundWaitPeriod.toMillis());
 
         final int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
 

Reply via email to