This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7797e93 [INLONG-2046][Bug]inlong-sort could not identify tube master
address when inlong-sort and tube-master were deployed in the same machine
(#2047)
7797e93 is described below
commit 7797e933e257be47286eec4349602c99c487fea5
Author: gosonzhang <[email protected]>
AuthorDate: Tue Dec 21 19:41:03 2021 +0800
[INLONG-2046][Bug]inlong-sort could not identify tube master address when
inlong-sort and tube-master were deployed in the same machine (#2047)
---
.../flink/tubemq/MultiTenancyTubeConsumer.java | 18 ++++-----------
.../connectors/tubemq/TubemqSourceFunction.java | 26 +++++-----------------
2 files changed, 10 insertions(+), 34 deletions(-)
diff --git
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
index 9601541..20cdd26 100644
---
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
+++
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/tubemq/MultiTenancyTubeConsumer.java
@@ -20,8 +20,6 @@ package org.apache.inlong.sort.flink.tubemq;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
@@ -36,7 +34,6 @@ import java.util.function.Consumer;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.flink.runtime.net.ConnectionUtils;
import
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.util.TimeUtils;
import org.apache.inlong.sort.configuration.Configuration;
@@ -46,6 +43,7 @@ import
org.apache.inlong.sort.flink.TDMsgMixedSerializedRecord;
import org.apache.inlong.sort.meta.MetaManager;
import org.apache.inlong.sort.util.CommonUtils;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
@@ -195,14 +193,6 @@ public class MultiTenancyTubeConsumer {
LOG.warn("Invalid master address {} provided of {}",
tubeMasterAddresses, topic);
return;
}
- final Pair<String, Integer> firstTubeMasterAddress =
parsedTubeMasterAddress.get(0);
- // get local address for connecting
- InetSocketAddress firstSocketAddress = new
InetSocketAddress(firstTubeMasterAddress.getLeft(),
- firstTubeMasterAddress.getRight());
- InetAddress localAddress = ConnectionUtils.findConnectingAddress(
- firstSocketAddress, 2000L, 400L);
- String localhost = localAddress.getHostAddress();
-
final Map<String, Long> partitionToOffset;
synchronized (context.getCheckpointLock()) {
if (topicToOffset.containsKey(topic)) {
@@ -214,13 +204,13 @@ public class MultiTenancyTubeConsumer {
}
// initialize ConsumerConfig
- ConsumerConfig consumerConfig = new ConsumerConfig(localhost,
tubeMasterAddresses, consumerGroup);
+ ConsumerConfig consumerConfig = new
ConsumerConfig(tubeMasterAddresses, consumerGroup);
// consumeFromMax works only if there is no offset restored from
checkpoint
if (consumeFromMax && partitionToOffset.isEmpty()) {
- consumerConfig.setConsumeModel(1);
+
consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS);
LOG.info("Would consume {} from max offset", topic);
} else {
- consumerConfig.setConsumeModel(0);
+
consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
}
consumerConfig.setMsgNotFoundWaitPeriodMs(messageNotFoundWaitPeriod.toMillis());
diff --git
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
index f64e581..368af08 100644
---
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
+++
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
@@ -20,11 +20,8 @@ package org.apache.flink.connectors.tubemq;
import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
-import static
org.apache.flink.runtime.net.ConnectionUtils.findConnectingAddress;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.TimeUtils.parseDuration;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@@ -45,6 +42,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
@@ -210,24 +208,12 @@ public class TubemqSourceFunction<T>
@Override
public void open(Configuration parameters) throws Exception {
-
- String firstAddress = masterAddress.split(SPLIT_COMMA)[0];
- String[] firstAddressSegments = firstAddress.split(SPLIT_COLON);
- String firstHost = firstAddressSegments[0];
- int firstPort = Integer.parseInt(firstAddressSegments[1]);
- InetSocketAddress firstSocketAddress =
- new InetSocketAddress(firstHost, firstPort);
-
- InetAddress localAddress =
- findConnectingAddress(firstSocketAddress, 2000, 400);
- String localhost = localAddress.getHostAddress();
-
ConsumerConfig consumerConfig =
- new ConsumerConfig(localhost, masterAddress, consumerGroup);
- consumerConfig
- .setConsumeModel(consumeFromMax ? 1 : 0);
- consumerConfig
- .setMsgNotFoundWaitPeriodMs(messageNotFoundWaitPeriod.toMillis());
+ new ConsumerConfig(masterAddress, consumerGroup);
+ consumerConfig.setConsumePosition(consumeFromMax
+ ? ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS
+ : ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+
consumerConfig.setMsgNotFoundWaitPeriodMs(messageNotFoundWaitPeriod.toMillis());
final int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();