This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 099a86cff0 Add schema as input to the decoder. (#12981)
099a86cff0 is described below
commit 099a86cff0ad16a4d1a798efaf1b2118cf8e0cfb
Author: Rekha Seethamraju <[email protected]>
AuthorDate: Wed Apr 24 12:30:36 2024 -0700
Add schema as input to the decoder. (#12981)
---
.../realtime/RealtimeSegmentDataManager.java | 24 +++++++++-
.../pinot/spi/stream/StreamDecoderProvider.java | 52 ----------------------
.../pinot/spi/stream/StreamMessageDecoder.java | 21 +++++++--
3 files changed, 40 insertions(+), 57 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 6771e038d1..3290c5e4f3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -75,6 +75,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
import org.apache.pinot.spi.stream.LongMsgOffset;
@@ -91,7 +92,6 @@ import
org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamDataDecoder;
import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
import org.apache.pinot.spi.stream.StreamDataDecoderResult;
-import org.apache.pinot.spi.stream.StreamDecoderProvider;
import org.apache.pinot.spi.stream.StreamMessage;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
@@ -1505,7 +1505,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
// Create message decoder
Set<String> fieldsToRead =
IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(),
_schema);
try {
- StreamMessageDecoder streamMessageDecoder =
StreamDecoderProvider.create(_streamConfig, fieldsToRead);
+ StreamMessageDecoder streamMessageDecoder =
createMessageDecoder(fieldsToRead);
_streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder);
} catch (Exception e) {
_realtimeTableDataManager.addSegmentError(_segmentNameStr,
@@ -1780,6 +1780,26 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
}
}
+ /**
+ * Creates a {@link StreamMessageDecoder} using properties in {@link
StreamConfig}.
+ *
+ * @param streamConfig The stream config from the table config
+ * @param fieldsToRead The fields to read from the source stream
+ * @return The initialized StreamMessageDecoder
+ */
+ private StreamMessageDecoder createMessageDecoder(Set<String> fieldsToRead) {
+ String decoderClass = _streamConfig.getDecoderClass();
+ try {
+ Map<String, String> decoderProperties =
_streamConfig.getDecoderProperties();
+ StreamMessageDecoder decoder =
PluginManager.get().createInstance(decoderClass);
+ decoder.init(fieldsToRead, _streamConfig, _tableConfig, _schema);
+ return decoder;
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Caught exception while creating StreamMessageDecoder from stream
config: " + _streamConfig, e);
+ }
+ }
+
@Override
public MutableSegment getSegment() {
return _realtimeSegment;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDecoderProvider.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDecoderProvider.java
deleted file mode 100644
index fdb97093de..0000000000
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDecoderProvider.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream;
-
-import java.util.Map;
-import java.util.Set;
-import org.apache.pinot.spi.plugin.PluginManager;
-
-
-/**
- * Provider for {@link StreamMessageDecoder}
- */
-public class StreamDecoderProvider {
- private StreamDecoderProvider() {
- }
-
- /**
- * Creates a {@link StreamMessageDecoder} using properties in {@link
StreamConfig}.
- *
- * @param streamConfig The stream config from the table config
- * @param fieldsToRead The fields to read from the source stream
- * @return The initialized StreamMessageDecoder
- */
- public static StreamMessageDecoder create(StreamConfig streamConfig,
Set<String> fieldsToRead) {
- String decoderClass = streamConfig.getDecoderClass();
- Map<String, String> decoderProperties =
streamConfig.getDecoderProperties();
- try {
- StreamMessageDecoder decoder =
PluginManager.get().createInstance(decoderClass);
- decoder.init(decoderProperties, fieldsToRead,
streamConfig.getTopicName());
- return decoder;
- } catch (Exception e) {
- throw new RuntimeException(
- "Caught exception while creating StreamMessageDecoder from stream
config: " + streamConfig, e);
- }
- }
-}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
index 89312f06b6..b736e975d1 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
@@ -23,9 +23,10 @@ import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
-
/**
* Interface for a decoder of messages fetched from the stream
* @param <T>
@@ -46,8 +47,22 @@ public interface StreamMessageDecoder<T> {
* @param topicName Topic name of the stream
* @throws Exception If an error occurs
*/
- void init(Map<String, String> props, Set<String> fieldsToRead, String
topicName)
- throws Exception;
+ default void init(Map<String, String> props, Set<String> fieldsToRead,
String topicName)
+ throws Exception {
+ throw new UnsupportedOperationException("init method not implemented");
+ }
+
+ /**
+ * Initializes the decoder.
+ * @param streamConfig Can be derived from tableConfig but is passed
explicitly to avoid redundant computation
+ * @param tableConfig Table Config of the table
+ * @param schema Schema of the table
+ * @throws Exception
+ */
+ default void init(Set<String> fieldsToRead, StreamConfig streamConfig,
TableConfig tableConfig, Schema schema)
+ throws Exception {
+ init(streamConfig.getDecoderProperties(), fieldsToRead,
streamConfig.getTopicName());
+ }
/**
* Decodes a row.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]