clintropolis commented on code in PR #16813:
URL: https://github.com/apache/druid/pull/16813#discussion_r1699928058


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kinesis;
+
+import com.google.common.collect.Lists;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.indexing.seekablestream.SettableByteEntity;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class KinesisInputReader implements InputEntityReader
+{
+
+  private final InputRowSchema inputRowSchema;
+  private final SettableByteEntity<KinesisRecordEntity> source;
+  private final InputEntityReader valueParser;
+  private final String timestampColumnName;
+
+  public KinesisInputReader(
+      InputRowSchema inputRowSchema,
+      SettableByteEntity<KinesisRecordEntity> source,
+      InputEntityReader valueParser,
+      String timestampColumnName
+  )
+  {
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
+    this.valueParser = valueParser;
+    this.timestampColumnName = timestampColumnName;
+
+  }
+
+  @Override
+  public CloseableIterator<InputRow> read() throws IOException
+  {
+    final KinesisRecordEntity record = source.getEntity();
+    final Map<String, Object> mergedHeaderMap = extractHeaders(record);
+
+    // Ignore tombstone records that have null values.
+    if (record.getRecord().getData() != null) {
+      return buildBlendedRows(valueParser, mergedHeaderMap);
+    } else {
+      return 
CloseableIterators.withEmptyBaggage(buildInputRowsForMap(mergedHeaderMap).iterator());
+    }
+  }
+
+  @Override
+  public CloseableIterator<InputRowListPlusRawValues> sample() throws 
IOException
+  {
+    final KinesisRecordEntity record = source.getEntity();
+    InputRowListPlusRawValues keysAndHeader = 
extractHeaderAndKeysSample(record);
+    if (record.getRecord().getData() != null) {
+      return buildBlendedRowsSample(valueParser, keysAndHeader.getRawValues());
+    } else {
+      final List<InputRowListPlusRawValues> rows = 
Collections.singletonList(keysAndHeader);
+      return CloseableIterators.withEmptyBaggage(rows.iterator());
+    }
+  }
+
+  private Map<String, Object> extractHeaders(KinesisRecordEntity record)

Review Comment:
   it seems like this method can only extract the timestamp from the record, do 
you intend to add other stuff to this? (see other comment for my suggestion of 
what to maybe do here instead)



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kinesis;
+
+import com.google.common.collect.Lists;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.indexing.seekablestream.SettableByteEntity;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class KinesisInputReader implements InputEntityReader
+{
+
+  private final InputRowSchema inputRowSchema;
+  private final SettableByteEntity<KinesisRecordEntity> source;
+  private final InputEntityReader valueParser;
+  private final String timestampColumnName;
+
+  public KinesisInputReader(
+      InputRowSchema inputRowSchema,
+      SettableByteEntity<KinesisRecordEntity> source,
+      InputEntityReader valueParser,
+      String timestampColumnName
+  )
+  {
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
+    this.valueParser = valueParser;
+    this.timestampColumnName = timestampColumnName;
+
+  }
+
+  @Override
+  public CloseableIterator<InputRow> read() throws IOException
+  {
+    final KinesisRecordEntity record = source.getEntity();
+    final Map<String, Object> mergedHeaderMap = extractHeaders(record);
+
+    // Ignore tombstone records that have null values.
+    if (record.getRecord().getData() != null) {
+      return buildBlendedRows(valueParser, mergedHeaderMap);
+    } else {
+      return 
CloseableIterators.withEmptyBaggage(buildInputRowsForMap(mergedHeaderMap).iterator());
+    }
+  }
+
+  @Override
+  public CloseableIterator<InputRowListPlusRawValues> sample() throws 
IOException
+  {
+    final KinesisRecordEntity record = source.getEntity();
+    InputRowListPlusRawValues keysAndHeader = 
extractHeaderAndKeysSample(record);
+    if (record.getRecord().getData() != null) {
+      return buildBlendedRowsSample(valueParser, keysAndHeader.getRawValues());
+    } else {
+      final List<InputRowListPlusRawValues> rows = 
Collections.singletonList(keysAndHeader);
+      return CloseableIterators.withEmptyBaggage(rows.iterator());
+    }
+  }
+
+  private Map<String, Object> extractHeaders(KinesisRecordEntity record)
+  {
+    final Map<String, Object> mergedHeaderMap = new HashMap<>();
+    // Add kinesis record timestamp to the mergelist, we will skip record 
timestamp if the same key exists already in
+    // the header list
+    mergedHeaderMap.putIfAbsent(timestampColumnName, 
record.getRecord().getApproximateArrivalTimestamp().getTime());

Review Comment:
   don't need putIfAbsent since its an empty map



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kinesis;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.seekablestream.SettableByteEntity;
+import org.apache.druid.java.util.common.DateTimes;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Objects;
+
+public class KinesisInputFormat implements InputFormat
+{
+  private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = 
"kinesis.timestamp";
+  public static final String DEFAULT_AUTO_TIMESTAMP_STRING = 
"__kif_auto_timestamp";
+
+  // Since KinesisInputFormat blends data from header, and payload, timestamp 
spec can be pointing to an attribute within one of these
+  // 2 sections. To handle scenarios where there is no timestamp value either 
in payload, we induce an artifical timestamp value
+  // to avoid unnecessary parser barf out. Users in such situations can use 
the inputFormat's kinesis record timestamp as its primary timestamp.
+  private final TimestampSpec dummyTimestampSpec = new 
TimestampSpec(DEFAULT_AUTO_TIMESTAMP_STRING, "auto", DateTimes.EPOCH);
+
+  private final InputFormat valueFormat;
+  private final String timestampColumnName;
+
+  public KinesisInputFormat(
+      @JsonProperty("valueFormat") InputFormat valueFormat,
+      @JsonProperty("timestampColumnName") @Nullable String timestampColumnName

Review Comment:
   it looks like there are other things on the record, why not expose them as 
well? Could do something like the kafka input format and just add a 
configurable prefix and just map them all as the prefix, instead of specifying 
explicit names for all of them (including timestamp? or i suppose you could 
still special handle that one similar to kafka, though headers and timestamp 
are stored differently there, it doesn't seem like there are free-form headers 
here so it seems less necessary).
   
   Anyway, if you had a prefix, it could be optional and default to something 
like `kinesis.record.` or something, and then in `extractHeader` of your 
reader, do something like
   
   ```
       mergedHeaderMap.put(headerPrefix + "approximateArrivalTimestamp", 
record.getRecord().getApproximateArrivalTimestamp().getTime());
       mergedHeaderMap.put(headerPrefix + "sequenceNumber", 
record.getRecord().getSequenceNumber());
       mergedHeaderMap.put(headerPrefix + "partitionKey", 
record.getRecord().getPartitionKey());
       mergedHeaderMap.put(headerPrefix + "encryptionType", 
record.getRecord().getEncryptionType());
   ```
   
   The alternative i guess would be to special name all of these properties



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kinesis;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.seekablestream.SettableByteEntity;
+import org.apache.druid.java.util.common.DateTimes;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Objects;
+
+public class KinesisInputFormat implements InputFormat
+{
+  private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = 
"kinesis.timestamp";
+  public static final String DEFAULT_AUTO_TIMESTAMP_STRING = 
"__kif_auto_timestamp";
+
+  // Since KinesisInputFormat blends data from header, and payload, timestamp 
spec can be pointing to an attribute within one of these
+  // 2 sections. To handle scenarios where there is no timestamp value either 
in payload, we induce an artifical timestamp value
+  // to avoid unnecessary parser barf out. Users in such situations can use 
the inputFormat's kinesis record timestamp as its primary timestamp.

Review Comment:
   nit: this copied comment doesn't make a ton of sense, plz fix



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisRecordEntity.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kinesis;
+
+import com.amazonaws.services.kinesis.model.Record;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.indexing.kinesis.KinesisRecordSupplier;
+
+/**
+ * A {@link ByteEntity} generated by {@link KinesisRecordSupplier} and fed to 
any {@link InputFormat} used by kinesis
+ * indexing tasks.
+ * <p>
+ * It can be used as a regular ByteEntity, in which case the kinesis record 
value is returned, but the {@link #getRecord}
+ * method also allows Kinesis-aware {@link InputFormat} implementations to 
read the full kinesis record, including
+ * timestamp, encrytion key, patition key, and sequence number
+ * <p>
+ * NOTE: Any records with null values will be skipped, even if they contain 
non-null keys, or headers
+ * <p>
+ * This functionality is not yet exposed through any built-in InputFormats, 
but is available for use in extensions.

Review Comment:
   what does this mean, this is an extension, extensions cannot depend on other 
extensions. I guess this is just a copy of the javadoc on KafkaRecordEntity, 
but it seems out of date since it predates `KafkaInputFormat` existing, so 
really should be fixed there too...



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kinesis;
+
+import com.google.common.collect.Lists;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.indexing.seekablestream.SettableByteEntity;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class KinesisInputReader implements InputEntityReader
+{
+
+  private final InputRowSchema inputRowSchema;
+  private final SettableByteEntity<KinesisRecordEntity> source;
+  private final InputEntityReader valueParser;
+  private final String timestampColumnName;
+
+  public KinesisInputReader(
+      InputRowSchema inputRowSchema,
+      SettableByteEntity<KinesisRecordEntity> source,
+      InputEntityReader valueParser,
+      String timestampColumnName
+  )
+  {
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
+    this.valueParser = valueParser;
+    this.timestampColumnName = timestampColumnName;
+
+  }
+
+  @Override
+  public CloseableIterator<InputRow> read() throws IOException
+  {
+    final KinesisRecordEntity record = source.getEntity();
+    final Map<String, Object> mergedHeaderMap = extractHeaders(record);
+
+    // Ignore tombstone records that have null values.
+    if (record.getRecord().getData() != null) {
+      return buildBlendedRows(valueParser, mergedHeaderMap);
+    } else {
+      return 
CloseableIterators.withEmptyBaggage(buildInputRowsForMap(mergedHeaderMap).iterator());
+    }
+  }
+
+  @Override
+  public CloseableIterator<InputRowListPlusRawValues> sample() throws 
IOException
+  {
+    final KinesisRecordEntity record = source.getEntity();
+    InputRowListPlusRawValues keysAndHeader = 
extractHeaderAndKeysSample(record);
+    if (record.getRecord().getData() != null) {
+      return buildBlendedRowsSample(valueParser, keysAndHeader.getRawValues());
+    } else {
+      final List<InputRowListPlusRawValues> rows = 
Collections.singletonList(keysAndHeader);
+      return CloseableIterators.withEmptyBaggage(rows.iterator());
+    }
+  }
+
+  private Map<String, Object> extractHeaders(KinesisRecordEntity record)
+  {
+    final Map<String, Object> mergedHeaderMap = new HashMap<>();
+    // Add kinesis record timestamp to the mergelist, we will skip record 
timestamp if the same key exists already in
+    // the header list

Review Comment:
   this comment doesn't really make sense because the map is empty... and it 
seems like there are not free-form headers in the kinesis record, only a fixed 
set of properties (unless i'm missing something)



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kinesis;
+
+import com.google.common.collect.Lists;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.indexing.seekablestream.SettableByteEntity;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class KinesisInputReader implements InputEntityReader
+{
+
+  private final InputRowSchema inputRowSchema;
+  private final SettableByteEntity<KinesisRecordEntity> source;
+  private final InputEntityReader valueParser;
+  private final String timestampColumnName;
+
+  public KinesisInputReader(
+      InputRowSchema inputRowSchema,
+      SettableByteEntity<KinesisRecordEntity> source,
+      InputEntityReader valueParser,
+      String timestampColumnName
+  )
+  {
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
+    this.valueParser = valueParser;
+    this.timestampColumnName = timestampColumnName;
+
+  }
+
+  @Override
+  public CloseableIterator<InputRow> read() throws IOException
+  {
+    final KinesisRecordEntity record = source.getEntity();
+    final Map<String, Object> mergedHeaderMap = extractHeaders(record);
+
+    // Ignore tombstone records that have null values.
+    if (record.getRecord().getData() != null) {
+      return buildBlendedRows(valueParser, mergedHeaderMap);
+    } else {
+      return 
CloseableIterators.withEmptyBaggage(buildInputRowsForMap(mergedHeaderMap).iterator());
+    }
+  }
+
+  @Override
+  public CloseableIterator<InputRowListPlusRawValues> sample() throws 
IOException
+  {
+    final KinesisRecordEntity record = source.getEntity();
+    InputRowListPlusRawValues keysAndHeader = 
extractHeaderAndKeysSample(record);
+    if (record.getRecord().getData() != null) {
+      return buildBlendedRowsSample(valueParser, keysAndHeader.getRawValues());
+    } else {
+      final List<InputRowListPlusRawValues> rows = 
Collections.singletonList(keysAndHeader);
+      return CloseableIterators.withEmptyBaggage(rows.iterator());
+    }
+  }
+
+  private Map<String, Object> extractHeaders(KinesisRecordEntity record)
+  {
+    final Map<String, Object> mergedHeaderMap = new HashMap<>();
+    // Add kinesis record timestamp to the mergelist, we will skip record 
timestamp if the same key exists already in
+    // the header list
+    mergedHeaderMap.putIfAbsent(timestampColumnName, 
record.getRecord().getApproximateArrivalTimestamp().getTime());
+
+    return mergedHeaderMap;
+  }
+
+  private CloseableIterator<InputRow> buildBlendedRows(
+      InputEntityReader valueParser,
+      Map<String, Object> headerKeyList
+  ) throws IOException
+  {
+    return valueParser.read().map(
+        r -> {
+          final HashSet<String> newDimensions = new 
HashSet<>(r.getDimensions());
+          final Map<String, Object> event = buildBlendedEventMap(r::getRaw, 
newDimensions, headerKeyList);
+          newDimensions.addAll(headerKeyList.keySet());
+          // Remove the dummy timestamp added in KinesisInputFormat
+          
newDimensions.remove(KinesisInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING);
+
+          final DateTime timestamp = 
MapInputRowParser.parseTimestamp(inputRowSchema.getTimestampSpec(), event);
+          return new MapBasedInputRow(
+              timestamp,
+              MapInputRowParser.findDimensions(
+                  inputRowSchema.getTimestampSpec(),
+                  inputRowSchema.getDimensionsSpec(),
+                  newDimensions
+              ),
+              event
+          );
+        }
+    );
+  }
+
+  private InputRowListPlusRawValues 
extractHeaderAndKeysSample(KinesisRecordEntity record)

Review Comment:
   nit: this doesn't have "keys" with their own separate format like kafka does 
so keys probably shouldn't be here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to