Jackie-Jiang commented on code in PR #12981:
URL: https://github.com/apache/pinot/pull/12981#discussion_r1578244090
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1780,6 +1780,31 @@ private void updateCurrentDocumentCountMetrics() {
}
}
+ /**
+ * Creates a {@link StreamMessageDecoder} using properties in {@link
StreamConfig}.
+ *
+ * @param streamConfig The stream config from the table config
+ * @param fieldsToRead The fields to read from the source stream
+ * @return The initialized StreamMessageDecoder
+ */
+ private StreamMessageDecoder create(Set<String> fieldsToRead) {
Review Comment:
(minor)
```suggestion
private StreamMessageDecoder createMessageDecoder(Set<String>
fieldsToRead) {
```
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1780,6 +1780,31 @@ private void updateCurrentDocumentCountMetrics() {
}
}
+ /**
+ * Creates a {@link StreamMessageDecoder} using properties in {@link
StreamConfig}.
+ *
+ * @param streamConfig The stream config from the table config
+ * @param fieldsToRead The fields to read from the source stream
+ * @return The initialized StreamMessageDecoder
+ */
+ private StreamMessageDecoder create(Set<String> fieldsToRead) {
+ String decoderClass = _streamConfig.getDecoderClass();
+ try {
+ Map<String, String> decoderProperties =
_streamConfig.getDecoderProperties();
+ StreamMessageDecoder decoder =
PluginManager.get().createInstance(decoderClass);
+ try {
+ decoder.init(decoderProperties, fieldsToRead,
_streamConfig.getTopicName());
+ } catch (UnsupportedOperationException e) {
+ // Backward compatibility
+ decoder.init(fieldsToRead, _streamConfig, _tableConfig, _schema);
+ }
Review Comment:
With the default fallback in the interface, we should just invoke the new
method
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java:
##########
@@ -46,8 +47,22 @@ public interface StreamMessageDecoder<T> {
* @param topicName Topic name of the stream
* @throws Exception If an error occurs
*/
- void init(Map<String, String> props, Set<String> fieldsToRead, String
topicName)
- throws Exception;
+ default void init(Map<String, String> props, Set<String> fieldsToRead,
String topicName)
+ throws Exception {
+ throw new UnsupportedOperationException("init method not implemented");
+ }
+
+ /**
+ * Initializes the decoder.
+ * @param streamConfig Can be derived from tableConfig but is passed
explicitly to avoid redundant computation
+ * @param tableConfig Table Config of the table
+ * @param schema Schema of the table
+ * @throws Exception
+ */
+ default void init(@Nullable Set<String> fields, StreamConfig streamConfig,
TableConfig tableConfig, Schema schema)
Review Comment:
Just realize `fields` won't be `null`
```suggestion
default void init(Set<String> fieldsToRead, StreamConfig streamConfig,
TableConfig tableConfig, Schema schema)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]