navina commented on code in PR #10995: URL: https://github.com/apache/pinot/pull/10995#discussion_r1247212492
########## pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractor.java: ########## @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.pulsar; + +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.stream.RowMetadata; +import org.apache.pulsar.client.api.Message; + + +public interface PulsarMetadataExtractor { + static PulsarMetadataExtractor build(boolean populateMetadata, boolean setRecordTimeFromEventTime) { + return message -> { + long eventTime = message.getEventTime(); + long publishTime = message.getPublishTime(); + long recordTimestamp = setRecordTimeFromEventTime ? eventTime : publishTime; + + Map<String, String> metadataMap = populateMetadataMap(populateMetadata, message); + + if (!populateMetadata) { + return new PulsarStreamMessageMetadata(recordTimestamp, null, metadataMap); + } + GenericRow headerGenericRow = buildGenericRow(message); + return new PulsarStreamMessageMetadata(recordTimestamp, headerGenericRow, metadataMap); + }; + } + + RowMetadata extract(Message<?> record); + + static GenericRow buildGenericRow(Message<?> message) { + if (MapUtils.isEmpty(message.getProperties())) { + return null; + } + GenericRow genericRow = new GenericRow(); + for (Map.Entry<String, String> entry : message.getProperties().entrySet()) { + genericRow.putValue(entry.getKey(), entry.getValue()); + } + return genericRow; + } + + static Map<String, String> populateMetadataMap(boolean populateAllFields, Message<?> message) { + long eventTime = message.getEventTime(); + long publishTime = message.getPublishTime(); + + Map<String, String> metadataMap = new HashMap<>(); + if (eventTime > 0) { + metadataMap.put(PulsarStreamMessageMetadata.EVENT_TIME_KEY, String.valueOf(eventTime)); + } + if (publishTime > 0) { + metadataMap.put(PulsarStreamMessageMetadata.PUBLISH_TIME_KEY, String.valueOf(publishTime)); + } + + message.getBrokerPublishTime().ifPresent( + brokerPublishTime -> metadataMap.put(PulsarStreamMessageMetadata.BROKER_PUBLISH_TIME_KEY, + String.valueOf(brokerPublishTime))); + + String key = message.getKey(); + if (StringUtils.isNotBlank(key)) { + metadataMap.put(PulsarStreamMessageMetadata.MESSAGE_KEY_KEY, key); + } + String messageIdStr = message.getMessageId().toString(); + metadataMap.put(PulsarStreamMessageMetadata.MESSAGE_ID_KEY, messageIdStr); + byte[] messageIdBytes = message.getMessageId().toByteArray(); + metadataMap.put(PulsarStreamMessageMetadata.MESSAGE_ID_BYTES_B64_KEY, Base64.encodeBase64String(messageIdBytes)); + + //From Kafka and Kensis metadata extractors we seem to still populate some timestamps, Review Comment: Yes. The ingestion engine uses the record timestamp to calculate lags/delays. That's why we add some metadata about the timestamp. ########## pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessage.java: ########## @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.pulsar; + +import javax.annotation.Nullable; +import org.apache.pinot.spi.stream.StreamMessage; +import org.apache.pulsar.client.api.MessageId; + +public class PulsarStreamMessage extends StreamMessage<byte[]> { + + private final MessageId _messageId; Review Comment: is this member variable needed when the same messageId is also added as metadata ? ########## pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessageMetadata.java: ########## @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.plugin.stream.pulsar; + +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.stream.StreamMessageMetadata; +public class PulsarStreamMessageMetadata extends StreamMessageMetadata { Review Comment: nit: new line ########## pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMessageMetadata.java: ########## @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.plugin.stream.pulsar; + +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.stream.StreamMessageMetadata; +public class PulsarStreamMessageMetadata extends StreamMessageMetadata { + public static final String PUBLISH_TIME_KEY = "publishTime"; Review Comment: can you add a link to what these metadata represent? perhaps in a javadoc for this class? ########## pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java: ########## @@ -81,9 +81,9 @@ public GenericRow next(GenericRow destination) { // Log every minute or 100k events if (now - _lastLogTime > 60000 || _currentCount - _lastCount >= 100000) { if (_lastCount == 0) { - _logger.info("Consumed {} events from kafka stream {}", _currentCount, _streamConfig.getTopicName()); + _logger.info("Consumed {} events from pulsar stream {}", _currentCount, _streamConfig.getTopicName()); Review Comment: lol :)) tks for fixing this. ########## pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java: ########## @@ -46,6 +46,8 @@ public class PulsarConfig { private final String _authenticationToken; private final String _tlsTrustCertsFilePath; private final boolean _enableKeyValueStitch; Review Comment: I think this can be marked as deprecated now that all the key/values/metadata is parsed into the `PulsarStreamMessage` instance ########## pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMetadataExtractor.java: ########## @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.pulsar; + +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.stream.RowMetadata; +import org.apache.pulsar.client.api.Message; + + +public interface PulsarMetadataExtractor { + static PulsarMetadataExtractor build(boolean populateMetadata, boolean setRecordTimeFromEventTime) { + return message -> { + long eventTime = message.getEventTime(); + long publishTime = message.getPublishTime(); + long recordTimestamp = setRecordTimeFromEventTime ? eventTime : publishTime; + + Map<String, String> metadataMap = populateMetadataMap(populateMetadata, message); + + if (!populateMetadata) { + return new PulsarStreamMessageMetadata(recordTimestamp, null, metadataMap); + } + GenericRow headerGenericRow = buildGenericRow(message); + return new PulsarStreamMessageMetadata(recordTimestamp, headerGenericRow, metadataMap); + }; + } + + RowMetadata extract(Message<?> record); + + static GenericRow buildGenericRow(Message<?> message) { + if (MapUtils.isEmpty(message.getProperties())) { + return null; + } + GenericRow genericRow = new GenericRow(); + for (Map.Entry<String, String> entry : message.getProperties().entrySet()) { + genericRow.putValue(entry.getKey(), entry.getValue()); + } + return genericRow; + } + + static Map<String, String> populateMetadataMap(boolean populateAllFields, Message<?> message) { Review Comment: It's interesting that pulsar has so much metadata in each record. do you see any value in making the list of metadata keys to be parsed as configurable for the pulsar consumer? ########## pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java: ########## @@ -56,26 +50,19 @@ public int getMessageCount() { } @Override - public byte[] getMessageAtIndex(int index) { - Message<byte[]> msg = _messageList.get(index); - if (_enableKeyValueStitch) { - return stitchKeyValue(msg.getKeyBytes(), msg.getData()); - } - return msg.getData(); + public PulsarStreamMessage getMessageAtIndex(int index) { + return _messageList.get(index); } @Override public int getMessageOffsetAtIndex(int index) { - return ByteBuffer.wrap(_messageList.get(index).getData()).arrayOffset(); + return ByteBuffer.wrap(_messageList.get(index).getValue()).arrayOffset(); } @Override public int getMessageLengthAtIndex(int index) { - if (_enableKeyValueStitch) { - Message<byte[]> msg = _messageList.get(index); - return 8 + msg.getKeyBytes().length + msg.getData().length; - } - return _messageList.get(index).getData().length; + return _messageList.get(index).getValue().length; //if _enableKeyValueStitch is true, then they are already stitched Review Comment: nit: s/they are already stitched/they are already stitched in the consumer/ ########## pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java: ########## @@ -46,6 +46,8 @@ public class PulsarConfig { private final String _authenticationToken; private final String _tlsTrustCertsFilePath; private final boolean _enableKeyValueStitch; + private final boolean _populateMetadata; + private final boolean _populateRecordTimeFromEventTime; Review Comment: nit: why is this configuration `_populateRecordTimeFromEventTime` needed ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
