This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3961e9d  [INLONG-3256][SDK] Modify log printing level and method for 
Sort-SDK (#3259)
3961e9d is described below

commit 3961e9d90acfbb9ecbf2f1b6e309ce959f010bf8
Author: wardli <[email protected]>
AuthorDate: Mon Mar 21 22:50:40 2022 +0800

    [INLONG-3256][SDK] Modify log printing level and method for Sort-SDK (#3259)
---
 .../inlong/sdk/sort/impl/InLongTopicManagerImpl.java  | 19 +++++++++----------
 .../inlong/sdk/sort/impl/QueryConsumeConfigImpl.java  |  1 -
 .../sdk/sort/impl/kafka/AckOffsetOnRebalance.java     |  4 ++--
 .../sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java   |  6 ++++--
 .../sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java | 15 +++------------
 .../sdk/sort/impl/tube/InLongTubeFetcherImpl.java     | 13 ++-----------
 .../org/apache/inlong/sdk/sort/util/StringUtil.java   |  2 +-
 .../sort/impl/pulsar/InLongPulsarFetcherImplTest.java |  6 ------
 .../sdk/sort/impl/tube/InLongTubeFetcherImplTest.java |  6 ------
 9 files changed, 21 insertions(+), 51 deletions(-)

diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
index f225502..a30f7f4 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/InLongTopicManagerImpl.java
@@ -209,7 +209,6 @@ public class InLongTopicManagerImpl extends 
InLongTopicManager {
             return true;
         } catch (Throwable th) {
             logger.error("close error " + sortTaskId, th);
-
         }
         return false;
     }
@@ -283,19 +282,19 @@ public class InLongTopicManagerImpl extends 
InLongTopicManager {
         }
 
         List<InLongTopic> newConsumeConfig = new 
ArrayList<>(currentConsumeConfig);
-        logger.info("newConsumeConfig List:{}", 
Arrays.toString(newConsumeConfig.toArray()));
+        logger.debug("newConsumeConfig List:{}", 
Arrays.toString(newConsumeConfig.toArray()));
         List<String> newTopics = getNewTopics(newConsumeConfig);
-        logger.info("newTopics :{}", Arrays.toString(newTopics.toArray()));
+        logger.debug("newTopics :{}", Arrays.toString(newTopics.toArray()));
 
         List<String> oldInLongTopics = new ArrayList<>(fetchers.keySet());
-        logger.info("oldInLongTopics :{}", 
Arrays.toString(oldInLongTopics.toArray()));
+        logger.debug("oldInLongTopics :{}", 
Arrays.toString(oldInLongTopics.toArray()));
         //get need be offlined topics
         oldInLongTopics.removeAll(newTopics);
-        logger.info("removed oldInLongTopics :{}", 
Arrays.toString(oldInLongTopics.toArray()));
+        logger.debug("removed oldInLongTopics :{}", 
Arrays.toString(oldInLongTopics.toArray()));
 
         //get new topics
         newTopics.removeAll(new ArrayList<>(fetchers.keySet()));
-        logger.info("really new topics :{}", 
Arrays.toString(newTopics.toArray()));
+        logger.debug("really new topics :{}", 
Arrays.toString(newTopics.toArray()));
         //offline need be offlined topics
         offlineRmovedTopic(oldInLongTopics);
         //online new topics
@@ -338,7 +337,7 @@ public class InLongTopicManagerImpl extends 
InLongTopicManager {
     private void onlineNewTopic(List<InLongTopic> newSubscribedInLongTopics, 
List<String> reallyNewTopic) {
         for (InLongTopic inLongTopic : newSubscribedInLongTopics) {
             if (!reallyNewTopic.contains(inLongTopic.getTopicKey())) {
-                
logger.info("!reallyNewTopic.contains(inLongTopic.getTopicKey())");
+                
logger.debug("!reallyNewTopic.contains(inLongTopic.getTopicKey())");
                 continue;
             }
             onlineTopic(inLongTopic);
@@ -377,7 +376,7 @@ public class InLongTopicManagerImpl extends 
InLongTopicManager {
                             
.authentication(AuthenticationFactory.token(inLongTopic.getInLongCluster().getToken()))
                             .build();
                     
pulsarClients.put(inLongTopic.getInLongCluster().getClusterId(), pulsarClient);
-                    logger.info("create pulsar client succ {}",
+                    logger.debug("create pulsar client succ {}",
                             new 
String[]{inLongTopic.getInLongCluster().getClusterId(),
                                     
inLongTopic.getInLongCluster().getBootstraps(),
                                     
inLongTopic.getInLongCluster().getToken()});
@@ -387,7 +386,7 @@ public class InLongTopicManagerImpl extends 
InLongTopicManager {
                     return false;
                 }
             } else {
-                logger.info("bootstrap is null {}", 
inLongTopic.getInLongCluster());
+                logger.error("bootstrap is null {}", 
inLongTopic.getInLongCluster());
                 return false;
             }
         }
@@ -405,7 +404,7 @@ public class InLongTopicManagerImpl extends 
InLongTopicManager {
                     TubeConsumerCreater tubeConsumerCreater = new 
TubeConsumerCreater(messageSessionFactory,
                             tubeConfig);
                     
tubeFactories.put(inLongTopic.getInLongCluster().getClusterId(), 
tubeConsumerCreater);
-                    logger.info("create tube client succ {} {} {}",
+                    logger.debug("create tube client succ {} {} {}",
                             new 
String[]{inLongTopic.getInLongCluster().getClusterId(),
                                     
inLongTopic.getInLongCluster().getBootstraps(),
                                     
inLongTopic.getInLongCluster().getToken()});
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
index 5265f80..1889e80 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
@@ -121,7 +121,6 @@ public class QueryConsumeConfigImpl implements 
QueryConsumeConfig {
                             e.getMessage());
             logger.error(msg, e);
         }
-        logger.debug("end to reload manager config.");
     }
 
     /**
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/AckOffsetOnRebalance.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/AckOffsetOnRebalance.java
index c09385b..efeec27 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/AckOffsetOnRebalance.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/AckOffsetOnRebalance.java
@@ -44,7 +44,7 @@ public class AckOffsetOnRebalance implements 
ConsumerRebalanceListener {
 
     @Override
     public void onPartitionsRevoked(Collection<TopicPartition> collection) {
-        logger.info("*- in ralance:onPartitionsRevoked");
+        logger.debug("*- in ralance:onPartitionsRevoked");
         while (!commitOffsetMap.isEmpty()) {
             consumer.commitSync(commitOffsetMap);
         }
@@ -52,7 +52,7 @@ public class AckOffsetOnRebalance implements 
ConsumerRebalanceListener {
 
     @Override
     public void onPartitionsAssigned(Collection<TopicPartition> collection) {
-        logger.info("*- in ralance:onPartitionsAssigned  ");
+        logger.debug("*- in ralance:onPartitionsAssigned  ");
         Map<TopicPartition, OffsetAndMetadata> committed = 
consumer.committed(new HashSet<>(collection));
         for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
committed.entrySet()) {
             consumer.seek(entry.getKey(), entry.getValue().offset());
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
index 531e78f..8dfd278 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
@@ -74,7 +74,7 @@ public class InLongKafkaFetcherImpl extends 
InLongTopicFetcher {
                 return false;
             }
             this.bootstrapServers = bootstrapServers;
-            String threadName = "sort_sdk_fetch_thread_" + 
StringUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss");
+            String threadName = "sort_sdk_fetch_thread_" + 
StringUtil.formatDate(new Date());
             this.fetchThread = new Thread(new Fetcher(), threadName);
             this.fetchThread.start();
         } catch (Exception e) {
@@ -91,6 +91,8 @@ public class InLongKafkaFetcherImpl extends 
InLongTopicFetcher {
             TopicPartition topicPartition = new 
TopicPartition(inLongTopic.getTopic(), Integer.parseInt(offset[0]));
             OffsetAndMetadata offsetAndMetadata = new 
OffsetAndMetadata(Long.parseLong(offset[1]));
             commitOffsetMap.put(topicPartition, offsetAndMetadata);
+        } else {
+            throw new Exception("offset is illegal, the correct format is 
int:long ,the error offset is:" + msgOffset);
         }
     }
 
@@ -123,7 +125,7 @@ public class InLongKafkaFetcherImpl extends 
InLongTopicFetcher {
 
     @Override
     public boolean isClosed() {
-        return false;
+        return closed;
     }
 
     @Override
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
index 112a959..53841f8 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
@@ -98,7 +98,7 @@ public class InLongPulsarFetcherImpl extends 
InLongTopicFetcher {
                     
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
                             inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
                             .addAckFailTimes(1L);
-                    logger.error("consumer == null");
+                    logger.error("consumer == null {}", inLongTopic);
                     return;
                 }
                 MessageId messageId = offsetCache.get(msgOffset);
@@ -106,13 +106,13 @@ public class InLongPulsarFetcherImpl extends 
InLongTopicFetcher {
                     
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
                             inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
                             .addAckFailTimes(1L);
-                    logger.error("messageId == null");
+                    logger.error("messageId == null {}", inLongTopic);
                     return;
                 }
                 consumer.acknowledgeAsync(messageId)
                         .thenAccept(consumer -> ackSucc(msgOffset))
                         .exceptionally(exception -> {
-                            logger.error("ack fail:{}", msgOffset);
+                            logger.error("ack fail:{} {}", inLongTopic, 
msgOffset);
                             
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
                                     
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
                                     .addAckFailTimes(1L);
@@ -163,15 +163,6 @@ public class InLongPulsarFetcherImpl extends 
InLongTopicFetcher {
     }
 
     /**
-     * isValidState
-     */
-    public void isValidState() {
-        if (closed) {
-            throw new IllegalStateException(inLongTopic + " closed.");
-        }
-    }
-
-    /**
      * pause
      */
     @Override
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
index 7540a19..812f1ad 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImpl.java
@@ -151,7 +151,7 @@ public class InLongTubeFetcherImpl extends 
InLongTopicFetcher {
 
     @Override
     public boolean isClosed() {
-        return this.closed;
+        return closed;
     }
 
     @Override
@@ -161,7 +161,7 @@ public class InLongTubeFetcherImpl extends 
InLongTopicFetcher {
 
     @Override
     public boolean isConsumeStop() {
-        return this.isStopConsume;
+        return isStopConsume;
     }
 
     @Override
@@ -179,15 +179,6 @@ public class InLongTubeFetcherImpl extends 
InLongTopicFetcher {
         return 0L;
     }
 
-    /**
-     * isValidState
-     */
-    public void isValidState() {
-        if (closed) {
-            throw new IllegalStateException(inLongTopic + " closed.");
-        }
-    }
-
     public class Fetcher implements Runnable {
 
         /**
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java
index ed0f3c8..6bacc8c 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/util/StringUtil.java
@@ -102,7 +102,7 @@ public class StringUtil {
      * @return String
      */
     public static String formatDate(Date date) {
-        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         return sdf.format(date);
     }
 
diff --git 
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
 
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
index e8365d3..cb439d5 100644
--- 
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
+++ 
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImplTest.java
@@ -162,12 +162,6 @@ public class InLongPulsarFetcherImplTest {
     }
 
     @Test
-    public void isValidState() {
-        InLongPulsarFetcherImpl inLongTopicFetcher = new 
InLongPulsarFetcherImpl(inLongTopic, clientContext);
-        inLongTopicFetcher.isValidState();
-    }
-
-    @Test
     public void pause() {
         InLongTopicFetcher inLongTopicFetcher = new 
InLongPulsarFetcherImpl(inLongTopic, clientContext);
         inLongTopicFetcher.pause();
diff --git 
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java
 
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java
index 9ef2762..b6c6d64 100644
--- 
a/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java
+++ 
b/inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/impl/tube/InLongTubeFetcherImplTest.java
@@ -70,12 +70,6 @@ public class InLongTubeFetcherImplTest {
     }
 
     @Test
-    public void isValidState() {
-        InLongTubeFetcherImpl inLongTopicFetcher = new 
InLongTubeFetcherImpl(inLongTopic, clientContext);
-        inLongTopicFetcher.isValidState();
-    }
-
-    @Test
     public void pause() {
         InLongTubeFetcherImpl inLongTopicFetcher = new 
InLongTubeFetcherImpl(inLongTopic, clientContext);
         inLongTopicFetcher.pause();

Reply via email to