This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit ad81c2d2f823051b914022df71722a059df3ac4d Author: Xin Gong <[email protected]> AuthorDate: Fri Sep 2 14:14:03 2022 +0800 [INLONG-5762][Sort] Fix the computing for the Pulsar source metric (#5763) --- .../table/DynamicPulsarDeserializationSchema.java | 79 ++++++++++++++++++---- .../pulsar/withoutadmin/CallbackCollector.java | 47 +++++++++++++ 2 files changed, 111 insertions(+), 15 deletions(-) diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java index 38a97bf5c..7522a2e43 100644 --- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java +++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java @@ -18,10 +18,11 @@ package org.apache.inlong.sort.pulsar.table; -import java.util.Arrays; -import java.util.HashSet; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource; import org.apache.flink.streaming.util.serialization.FlinkSchema; import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema; @@ -35,14 +36,21 @@ import org.apache.flink.util.Preconditions; import org.apache.inlong.audit.AuditImp; import org.apache.inlong.sort.base.Constants; import org.apache.inlong.sort.base.metric.SourceMetricData; +import org.apache.inlong.sort.pulsar.withoutadmin.CallbackCollector; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Schema; import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + import static org.apache.inlong.sort.base.Constants.DELIMITER; /** @@ -118,7 +126,7 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema< inlongGroupId = inlongMetricArray[0]; inlongStreamId = inlongMetricArray[1]; String nodeId = inlongMetricArray[2]; - sourceMetricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, context.getMetricGroup()); + sourceMetricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, getMetricGroup(context)); sourceMetricData.registerMetricsForNumBytesIn(); sourceMetricData.registerMetricsForNumBytesInPerSecond(); sourceMetricData.registerMetricsForNumRecordsIn(); @@ -132,6 +140,40 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema< } + /** + * reflect get metricGroup + * + * @param context Contextual information that can be used during initialization. + * @return metric group that can be used to register new metrics with Flink and to create a nested hierarchy based + * on the group names. + */ + private MetricGroup getMetricGroup(DeserializationSchema.InitializationContext context) + throws NoSuchFieldException, IllegalAccessException { + MetricGroup metricGroup; + String className = "RuntimeContextDeserializationInitializationContextAdapter"; + String fieldName = "runtimeContext"; + Class runtimeContextDeserializationInitializationContextAdapter = null; + Class[] innerClazz = RuntimeContextInitializationContextAdapters.class.getDeclaredClasses(); + for (Class clazz : innerClazz) { + int mod = clazz.getModifiers(); + if (Modifier.isPrivate(mod)) { + if (className.equalsIgnoreCase(clazz.getSimpleName())) { + runtimeContextDeserializationInitializationContextAdapter = clazz; + break; + } + } + } + if (runtimeContextDeserializationInitializationContextAdapter != null) { + Field field = runtimeContextDeserializationInitializationContextAdapter.getDeclaredField(fieldName); + field.setAccessible(true); + RuntimeContext runtimeContext = (RuntimeContext) field.get(context); + metricGroup = runtimeContext.getMetricGroup(); + } else { + metricGroup = context.getMetricGroup(); + } + return metricGroup; + } + @Override public boolean isEndOfStream(RowData nextElement) { return false; @@ -146,11 +188,15 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema< @Override public void deserialize(Message<RowData> message, Collector<RowData> collector) throws IOException { + AtomicLong counter = new AtomicLong(); // shortcut in case no output projection is required, // also not for a cartesian product with the keys if (keyDeserialization == null && !hasMetadata) { - valueDeserialization.deserialize(message.getData(), collector); - outputMetrics(message); + valueDeserialization.deserialize(message.getData(), new CallbackCollector<>(inputRow -> { + counter.addAndGet(1L); + collector.collect(inputRow); + })); + outputMetrics(counter, message); return; } BufferingCollector keyCollector = new BufferingCollector(); @@ -168,27 +214,30 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema< // collect tombstone messages in upsert mode by hand outputCollector.collect(null); } else { - valueDeserialization.deserialize(message.getData(), outputCollector); - outputMetrics(message); + valueDeserialization.deserialize(message.getData(), new CallbackCollector<>(inputRow -> { + counter.addAndGet(1L); + outputCollector.collect(inputRow); + })); + outputMetrics(counter, message); } keyCollector.buffer.clear(); } - private void outputMetrics(Message<RowData> message) { + private void outputMetrics(AtomicLong counter, Message<RowData> message) { if (sourceMetricData != null) { - sourceMetricData.getNumRecordsIn().inc(1L); + sourceMetricData.getNumRecordsIn().inc(counter.get()); sourceMetricData.getNumBytesIn() .inc(message.getData().length); } if (auditImp != null) { auditImp.add( - Constants.AUDIT_SORT_INPUT, - inlongGroupId, - inlongStreamId, - System.currentTimeMillis(), - 1, - message.getData().length); + Constants.AUDIT_SORT_INPUT, + inlongGroupId, + inlongStreamId, + System.currentTimeMillis(), + counter.get(), + message.getData().length); } } diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java new file mode 100644 index 000000000..e61b7c3f7 --- /dev/null +++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/CallbackCollector.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.pulsar.withoutadmin; + +import org.apache.flink.util.Collector; +import org.apache.flink.util.function.ThrowingConsumer; + +/** + * A collector supporting callback. + */ +public class CallbackCollector<T> implements Collector<T> { + + private final ThrowingConsumer<T, Exception> callback; + + public CallbackCollector(ThrowingConsumer<T, Exception> callback) { + this.callback = callback; + } + + @Override + public void collect(T t) { + try { + callback.accept(t); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + + } +}
