This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ab27830e8 [INLONG-4453][Sort-Standalone] Fix bug that report wrong
audit when send to kafka failed (#4454)
ab27830e8 is described below
commit ab27830e822b65696655df065cc7a38f08650073
Author: vernedeng <[email protected]>
AuthorDate: Wed Jun 1 17:37:26 2022 +0800
[INLONG-4453][Sort-Standalone] Fix bug that report wrong audit when send to
kafka failed (#4454)
---
.../standalone/sink/kafka/KafkaFederationSink.java | 13 +------------
.../sink/kafka/KafkaFederationWorker.java | 12 ++----------
.../sink/kafka/KafkaProducerCluster.java | 5 +++--
.../standalone/source/sortsdk/FetchCallback.java | 22 ++++++++++------------
.../standalone/source/sortsdk/SortSdkSource.java | 16 ++++++++--------
5 files changed, 24 insertions(+), 44 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSink.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSink.java
index b163ef198..ac10fa9ac 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSink.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSink.java
@@ -21,35 +21,24 @@ import org.apache.flume.Context;
import org.apache.flume.Sink;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
-import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
public class KafkaFederationSink extends AbstractSink implements Configurable {
private static final Logger LOG =
InlongLoggerFactory.getLogger(KafkaFederationSink.class);
private Context parentContext;
private KafkaFederationSinkContext context;
private List<KafkaFederationWorker> workers = new ArrayList<>();
- private Map<String, String> dimensions;
/** init and start workers */
@Override
public void start() {
String sinkName = this.getName();
- if (getChannel() == null) {
- LOG.error("channel is null");
- }
- this.context = new KafkaFederationSinkContext(getName(),
parentContext, getChannel());
+ this.context = new KafkaFederationSinkContext(sinkName, parentContext,
getChannel());
this.context.start();
- this.dimensions = new HashMap<>();
- this.dimensions.put(SortMetricItem.KEY_CLUSTER_ID,
this.context.getClusterId());
- this.dimensions.put(SortMetricItem.KEY_TASK_NAME,
this.context.getTaskName());
- this.dimensions.put(SortMetricItem.KEY_SINK_ID,
this.context.getSinkName());
// create worker
for (int i = 0; i < context.getMaxThreads(); i++) {
KafkaFederationWorker worker = new KafkaFederationWorker(sinkName,
i, context);
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
index 97f26c026..f3d7127d2 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationWorker.java
@@ -28,7 +28,6 @@ import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.utils.Constants;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
-import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import java.util.Map;
@@ -89,10 +88,6 @@ public class KafkaFederationWorker extends Thread {
Transaction tx = null;
try {
Channel channel = context.getChannel();
- if (channel == null) {
- LOG.error("in kafka worker, channel is null ");
- break;
- }
tx = channel.getTransaction();
tx.begin();
Event rowEvent = channel.take();
@@ -140,11 +135,8 @@ public class KafkaFederationWorker extends Thread {
String inlongStreamId = headers.get(Constants.INLONG_STREAM_ID);
String uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
String topic = this.context.getTopic(uid);
- if (!StringUtils.isBlank(topic)) {
- headers.put(Constants.TOPIC, topic);
- return topic;
- }
- return "-";
+ headers.put(Constants.TOPIC, topic);
+ return topic;
}
/** sleepOneInterval */
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
index 52c8bdb4c..94d468760 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
@@ -132,6 +132,7 @@ public class KafkaProducerCluster implements LifecycleAware
{
tx.commit();
profileEvent.ack();
tx.close();
+ sinkContext.addSendResultMetric(profileEvent, topic, false,
sendTime);
return true;
}
try {
@@ -145,7 +146,7 @@ public class KafkaProducerCluster implements LifecycleAware
{
LOG.error(String.format("send failed, topic is %s,
partition is %s",
metadata.topic(), metadata.partition()),
ex);
tx.rollback();
- sinkContext.addSendResultMetric(profileEvent,
topic, true, sendTime);
+ sinkContext.addSendResultMetric(profileEvent,
topic, false, sendTime);
}
tx.close();
});
@@ -154,7 +155,7 @@ public class KafkaProducerCluster implements LifecycleAware
{
tx.rollback();
tx.close();
LOG.error(e.getMessage(), e);
- sinkContext.addSendResultMetric(profileEvent, topic, true,
sendTime);
+ sinkContext.addSendResultMetric(profileEvent, topic, false,
sendTime);
return false;
}
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
index 2fd854efa..458a22f4e 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
@@ -53,7 +53,7 @@ public class FetchCallback implements ReadCallback {
private static final Logger LOG =
LoggerFactory.getLogger(FetchCallback.class);
// SortId of fetch message.
- private final String sortId;
+ private final String sortTaskName;
// ChannelProcessor that put message in specific channel.
private final ChannelProcessor channelProcessor;
@@ -68,15 +68,15 @@ public class FetchCallback implements ReadCallback {
* Private constructor of {@link FetchCallback}.
* <p> The construction of FetchCallback should be initiated by {@link
FetchCallback.Factory}.</p>
*
- * @param sortId SortId of fetch message.
+ * @param sortTaskName SortId of fetch message.
* @param channelProcessor ChannelProcessor that message put in.
* @param context The context to report fetch results.
*/
private FetchCallback(
- final String sortId,
+ final String sortTaskName,
final ChannelProcessor channelProcessor,
final SortSdkSourceContext context) {
- this.sortId = sortId;
+ this.sortTaskName = sortTaskName;
this.channelProcessor = channelProcessor;
this.context = context;
}
@@ -105,23 +105,21 @@ public class FetchCallback implements ReadCallback {
Preconditions.checkState(messageRecord != null, "Fetched msg is
null.");
CacheMessageRecord cacheRecord = new
CacheMessageRecord(messageRecord, client);
for (InLongMessage inLongMessage : messageRecord.getMsgs()) {
- //TODO fix here
final SubscribeFetchResult result =
SubscribeFetchResult.Factory
- .create(sortId, messageRecord.getMsgKey(),
messageRecord.getOffset(),
+ .create(sortTaskName, messageRecord.getMsgKey(),
messageRecord.getOffset(),
inLongMessage.getParams(),
messageRecord.getRecTime(),
inLongMessage.getBody());
final ProfileEvent profileEvent = new
ProfileEvent(result.getBody(), result.getHeaders(),
cacheRecord);
channelProcessor.processEvent(profileEvent);
- context.reportToMetric(profileEvent, sortId, "-",
SortSdkSourceContext.FetchResult.SUCCESS);
+ context.reportToMetric(profileEvent, sortTaskName, "-",
SortSdkSourceContext.FetchResult.SUCCESS);
}
-
-// client.ack(messageRecord.getMsgKey(), messageRecord.getOffset());
} catch (NullPointerException npe) {
- LOG.error("Got a null pointer exception for sortId " + sortId,
npe);
- context.reportToMetric(null, sortId, "-",
SortSdkSourceContext.FetchResult.FAILURE);
+ LOG.error("Got a null pointer exception for sort task " +
sortTaskName, npe);
+ context.reportToMetric(null, sortTaskName, "-",
SortSdkSourceContext.FetchResult.FAILURE);
} catch (Exception e) {
- LOG.error("Ack failed for sortId " + sortId, e);
+ LOG.error("Got exception of sort task " + sortTaskName, e);
+ context.reportToMetric(null, sortTaskName, "-",
SortSdkSourceContext.FetchResult.FAILURE);
}
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index 42b8b34ff..9e15b71ef 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -151,23 +151,23 @@ public final class SortSdkSource extends AbstractSource
}
/**
- * Create one {@link SortClient} with specific sort id.
+ * Create one {@link SortClient} with specific sort task.
*
* <p>
* In current version, the {@link FetchCallback} will hold the client to
ACK. For more details see
* {@link FetchCallback#onFinished}
* </p>
*
- * @param sortId Sort in of new client.
+ * @param sortTaskName Sort in of new client.
* @return New sort client.
*/
- private SortClient newClient(final String sortId) {
- LOG.info("Start to new sort client for id: {}", sortId);
+ private SortClient newClient(final String sortTaskName) {
+ LOG.info("Start to new sort client for task: {}", sortTaskName);
try {
- final SortClientConfig clientConfig = new SortClientConfig(sortId,
this.sortClusterName,
+ final SortClientConfig clientConfig = new
SortClientConfig(sortTaskName, this.sortClusterName,
new DefaultTopicChangeListener(),
SortSdkSource.defaultStrategy,
InetAddress.getLocalHost().getHostAddress());
- final FetchCallback callback =
FetchCallback.Factory.create(sortId, getChannelProcessor(), context);
+ final FetchCallback callback =
FetchCallback.Factory.create(sortTaskName, getChannelProcessor(), context);
clientConfig.setCallback(callback);
// create SortClient
@@ -211,9 +211,9 @@ public final class SortSdkSource extends AbstractSource
callback.setClient(client);
return client;
} catch (UnknownHostException ex) {
- LOG.error("Got one UnknownHostException when init client of
id:{}", sortId, ex);
+ LOG.error("Got one UnknownHostException when init client of
id:{}", sortTaskName, ex);
} catch (Throwable th) {
- LOG.error("Got one throwable when init client of id:{}", sortId,
th);
+ LOG.error("Got one throwable when init client of id:{}",
sortTaskName, th);
}
return null;
}