Jackie-Jiang commented on a change in pull request #4758: Bug fixing: Json
decoder extracts wrong time field value if schema contains outgoingTimeFieldSpec
URL: https://github.com/apache/incubator-pinot/pull/4758#discussion_r340756444
##########
File path:
pinot-connectors/pinot-connector-kafka-base/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaJSONMessageDecoder.java
##########
@@ -33,21 +35,30 @@
public class KafkaJSONMessageDecoder implements StreamMessageDecoder<byte[]> {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaJSONMessageDecoder.class);
- private Schema schema;
+ private Schema _schema;
+ private FieldSpec _incomingTimeFieldSpec;
@Override
public void init(Map<String, String> props, Schema indexingSchema, String
topicName)
throws Exception {
- this.schema = indexingSchema;
+ _schema = indexingSchema;
+
+ // For time field, we use the incoming time field spec
+ TimeFieldSpec timeFieldSpec = _schema.getTimeFieldSpec();
+ if (timeFieldSpec != null) {
+ _incomingTimeFieldSpec = new
TimeFieldSpec(timeFieldSpec.getIncomingGranularitySpec());
+ }
}
@Override
public GenericRow decode(byte[] payload, GenericRow destination) {
try {
JsonNode message = JsonUtils.bytesToJsonNode(payload);
- for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
- String column = fieldSpec.getName();
- destination.putField(column,
JsonUtils.extractValue(message.get(column), fieldSpec));
+ for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
+ FieldSpec incomingFieldSpec =
+ fieldSpec.getFieldType() == FieldSpec.FieldType.TIME ?
_incomingTimeFieldSpec : fieldSpec;
+ String column = incomingFieldSpec.getName();
+ destination.putValue(column,
JsonUtils.extractValue(message.get(column), fieldSpec));
Review comment:
`incomingFieldSpec`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]