This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3961e9d [INLONG-3256][SDK] Modify log printing level and method for
Sort-SDK (#3259)
3961e9d is described below
commit 3961e9d90acfbb9ecbf2f1b6e309ce959f010bf8
Author: wardli <[email protected]>
AuthorDate: Mon Mar 21 22:50:40 2022 +0800
[INLONG-3256][SDK] Modify log printing level and method for Sort-SDK (#3259)
---
.../inlong/sdk/sort/impl/InLongTopicManagerImpl.java | 19 +++++++++----------
.../inlong/sdk/sort/impl/QueryConsumeConfigImpl.java | 1 -
.../sdk/sort/impl/kafka/AckOffsetOnRebalance.java | 4 ++--
.../sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java | 6 ++++--
.../sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java | 15 +++------------
.../sdk/sort/impl/tube/InLongTubeFetcherImpl.java | 13 ++-----------
.../org/apache/inlong/sdk/sort/util/StringUtil.java | 2 +-
.../sort/impl/pulsar/InLongPulsarFetcherImplTest.java | 6 ------
.../sdk/sort/impl/tube/InLongTubeFetcherImplTest.java | 6 ------
9 files changed, 21 insertions(+), 51 deletions(-)
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
index f225502..a30f7f4 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
@@ -209,7 +209,6 @@ public class InLongTopicManagerImpl extends
InLongTopicManager {
return true;
} catch (Throwable th) {
logger.error("close error " + sortTaskId, th);
-
}
return false;
}
@@ -283,19 +282,19 @@ public class InLongTopicManagerImpl extends
InLongTopicManager {
}
List<InLongTopic> newConsumeConfig = new
ArrayList<>(currentConsumeConfig);
- logger.info("newConsumeConfig List:{}",
Arrays.toString(newConsumeConfig.toArray()));
+ logger.debug("newConsumeConfig List:{}",
Arrays.toString(newConsumeConfig.toArray()));
List<String> newTopics = getNewTopics(newConsumeConfig);
- logger.info("newTopics :{}", Arrays.toString(newTopics.toArray()));
+ logger.debug("newTopics :{}", Arrays.toString(newTopics.toArray()));
List<String> oldInLongTopics = new ArrayList<>(fetchers.keySet());
- logger.info("oldInLongTopics :{}",
Arrays.toString(oldInLongTopics.toArray()));
+ logger.debug("oldInLongTopics :{}",
Arrays.toString(oldInLongTopics.toArray()));
//get need be offlined topics
oldInLongTopics.removeAll(newTopics);
- logger.info("removed oldInLongTopics :{}",
Arrays.toString(oldInLongTopics.toArray()));
+ logger.debug("removed oldInLongTopics :{}",
Arrays.toString(oldInLongTopics.toArray()));
//get new topics
newTopics.removeAll(new ArrayList<>(fetchers.keySet()));
- logger.info("really new topics :{}",
Arrays.toString(newTopics.toArray()));
+ logger.debug("really new topics :{}",
Arrays.toString(newTopics.toArray()));
//offline need be offlined topics
offlineRmovedTopic(oldInLongTopics);
//online new topics
@@ -338,7 +337,7 @@ public class InLongTopicManagerImpl extends
InLongTopicManager {
private void onlineNewTopic(List<InLongTopic> newSubscribedInLongTopics,
List<String> reallyNewTopic) {
for (InLongTopic inLongTopic : newSubscribedInLongTopics) {
if (!reallyNewTopic.contains(inLongTopic.getTopicKey())) {
-
logger.info("!reallyNewTopic.contains(inLongTopic.getTopicKey())");
+
logger.debug("!reallyNewTopic.contains(inLongTopic.getTopicKey())");
continue;
}
onlineTopic(inLongTopic);
@@ -377,7 +376,7 @@ public class InLongTopicManagerImpl extends
InLongTopicManager {
.authentication(AuthenticationFactory.token(inLongTopic.getInLongCluster().getToken()))
.build();
pulsarClients.put(inLongTopic.getInLongCluster().getClusterId(), pulsarClient);
- logger.info("create pulsar client succ {}",
+ logger.debug("create pulsar client succ {}",
new
String[]{inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getInLongCluster().getBootstraps(),
inLongTopic.getInLongCluster().getToken()});
@@ -387,7 +386,7 @@ public class InLongTopicManagerImpl extends
InLongTopicManager {
return false;
}
} else {
- logger.info("bootstrap is null {}",
inLongTopic.getInLongCluster());
+ logger.error("bootstrap is null {}",
inLongTopic.getInLongCluster());
return false;
}
}
@@ -405,7 +404,7 @@ public class InLongTopicManagerImpl extends
InLongTopicManager {
TubeConsumerCreater tubeConsumerCreater = new
TubeConsumerCreater(messageSessionFactory,
tubeConfig);
tubeFactories.put(inLongTopic.getInLongCluster().getClusterId(),
tubeConsumerCreater);
- logger.info("create tube client succ {} {} {}",
+ logger.debug("create tube client succ {} {} {}",
new
String[]{inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getInLongCluster().getBootstraps(),
inLongTopic.getInLongCluster().getToken()});
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
index 5265f80..1889e80 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
@@ -121,7 +121,6 @@ public class QueryConsumeConfigImpl implements
QueryConsumeConfig {
e.getMessage());
logger.error(msg, e);
}
- logger.debug("end to reload manager config.");
}
/**
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/AckOffsetOnRebalance.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/AckOffsetOnRebalance.java
index c09385b..efeec27 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/AckOffsetOnRebalance.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/AckOffsetOnRebalance.java
@@ -44,7 +44,7 @@ public class AckOffsetOnRebalance implements
ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
- logger.info("*- in ralance:onPartitionsRevoked");
+ logger.debug("*- in ralance:onPartitionsRevoked");
while (!commitOffsetMap.isEmpty()) {
consumer.commitSync(commitOffsetMap);
}
@@ -52,7 +52,7 @@ public class AckOffsetOnRebalance implements
ConsumerRebalanceListener {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
- logger.info("*- in ralance:onPartitionsAssigned ");
+ logger.debug("*- in ralance:onPartitionsAssigned ");
Map<TopicPartition, OffsetAndMetadata> committed =
consumer.committed(new HashSet<>(collection));
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry :
committed.entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset());
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
index 531e78f..8dfd278 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
@@ -74,7 +74,7 @@ public class InLongKafkaFetcherImpl extends
InLongTopicFetcher {
return false;
}
this.bootstrapServers = bootstrapServers;
- String threadName = "sort_sdk_fetch_thread_" +
StringUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss");
+ String threadName = "sort_sdk_fetch_thread_" +
StringUtil.formatDate(new Date());
this.fetchThread = new Thread(new Fetcher(), threadName);
this.fetchThread.start();
} catch (Exception e) {
@@ -91,6 +91,8 @@ public class InLongKafkaFetcherImpl extends
InLongTopicFetcher {
TopicPartition topicPartition = new
TopicPartition(inLongTopic.getTopic(), Integer.parseInt(offset[0]));
OffsetAndMetadata offsetAndMetadata = new
OffsetAndMetadata(Long.parseLong(offset[1]));
commitOffsetMap.put(topicPartition, offsetAndMetadata);
+ } else {
+ throw new Exception("offset is illegal, the correct format is
int:long ,the error offset is:" + msgOffset);
}
}
@@ -123,7 +125,7 @@ public class InLongKafkaFetcherImpl extends
InLongTopicFetcher {
@Override
public boolean isClosed() {
- return false;
+ return closed;
}
@Override
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
index 112a959..53841f8 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
@@ -98,7 +98,7 @@ public class InLongPulsarFetcherImpl extends
InLongTopicFetcher {
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
.addAckFailTimes(1L);
- logger.error("consumer == null");
+ logger.error("consumer == null {}", inLongTopic);
return;
}
MessageId messageId = offsetCache.get(msgOffset);
@@ -106,13 +106,13 @@ public class InLongPulsarFetcherImpl extends
InLongTopicFetcher {
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
inLongTopic.getInLongCluster().getClusterId(),
inLongTopic.getTopic())
.addAckFailTimes(1L);
- logger.error("messageId == null");
+ logger.error("messageId == null {}", inLongTopic);
return;
}
consumer.acknowledgeAsync(messageId)
.thenAccept(consumer -> ackSucc(msgOffset))
.exceptionally(exception -> {
- logger.error("ack fail:{}", msgOffset);
+ logger.error("ack fail:{} {}", inLongTopic,
msgOffset);
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
.addAckFailTimes(1L);
@@ -163,15 +163,6 @@ public class InLongPulsarFetcherImpl extends
InLongTopicFetcher {
}
/**
- * isValidState
- */
- public void isValidState() {
- if (closed) {
- throw new IllegalStateException(inLongTopic + " closed.");
- }
- }
-
- /**
* pause
*/
@Override
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
index 7540a19..812f1ad 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
@@ -151,7 +151,7 @@ public class InLongTubeFetcherImpl extends
InLongTopicFetcher {
@Override
public boolean isClosed() {
- return this.closed;
+ return closed;
}
@Override
@@ -161,7 +161,7 @@ public class InLongTubeFetcherImpl extends
InLongTopicFetcher {
@Override
public boolean isConsumeStop() {
- return this.isStopConsume;
+ return isStopConsume;
}
@Override
@@ -179,15 +179,6 @@ public class InLongTubeFetcherImpl extends
InLongTopicFetcher {
return 0L;
}
- /**
- * isValidState
- */
- public void isValidState() {
- if (closed) {
- throw new IllegalStateException(inLongTopic + " closed.");
- }
- }
-
public class Fetcher implements Runnable {
/**
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java
index ed0f3c8..6bacc8c 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java
@@ -102,7 +102,7 @@ public class StringUtil {
* @return String
*/
public static String formatDate(Date date) {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf.format(date);
}
diff --git
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
index e8365d3..cb439d5 100644
---
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
+++
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
@@ -162,12 +162,6 @@ public class InLongPulsarFetcherImplTest {
}
@Test
- public void isValidState() {
- InLongPulsarFetcherImpl inLongTopicFetcher = new
InLongPulsarFetcherImpl(inLongTopic, clientContext);
- inLongTopicFetcher.isValidState();
- }
-
- @Test
public void pause() {
InLongTopicFetcher inLongTopicFetcher = new
InLongPulsarFetcherImpl(inLongTopic, clientContext);
inLongTopicFetcher.pause();
diff --git
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java
index 9ef2762..b6c6d64 100644
---
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java
+++
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java
@@ -70,12 +70,6 @@ public class InLongTubeFetcherImplTest {
}
@Test
- public void isValidState() {
- InLongTubeFetcherImpl inLongTopicFetcher = new
InLongTubeFetcherImpl(inLongTopic, clientContext);
- inLongTopicFetcher.isValidState();
- }
-
- @Test
public void pause() {
InLongTubeFetcherImpl inLongTopicFetcher = new
InLongTubeFetcherImpl(inLongTopic, clientContext);
inLongTopicFetcher.pause();