clintropolis commented on code in PR #16813:
URL: https://github.com/apache/druid/pull/16813#discussion_r1700901564


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kinesis;
+
+import com.amazonaws.services.kinesis.model.Record;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.seekablestream.SettableByteEntity;
+import org.apache.druid.java.util.common.DateTimes;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Objects;
+
+/**
+ * Kinesis aware InputFormat. Allows for reading kinesis specific values that 
are stored in the {@link Record}. At
+ * this time, this input format only supports reading the main record payload 
({@link Record#data}) and
+ * {@link Record#approximateArrivalTimestamp}, but can be extended easily to 
read other fields.
+ */
+public class KinesisInputFormat implements InputFormat
+{
+  private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = 
"kinesis.timestamp";
+  public static final String DEFAULT_AUTO_TIMESTAMP_STRING = 
"__kif_auto_timestamp";
+
+  // Since KinesisInputFormat blends data from record timestamp, and payload, 
timestamp spec can be pointing to an attribute within one of these
+  // 2 sections. To handle scenarios where there is no timestamp value either 
in payload or record timestamp, we induce an artifical timestamp value
+  // to avoid unnecessary parser barf out. Users in such situations can use 
the inputFormat's kinesis record timestamp as its primary timestamp.

Review Comment:
   Is it really possible for there to be no timestamp in 
`record.getApproximateArrivalTimestamp`? This comment in kafka is because both 
value and keys use their own `InputFormat` to parse stuff before blending 
together with headers. I think in this case only the value payload could 
potentially be missing a timestamp, it seems like the 
`record.getApproximateArrivalTimestamp` should always be there (the kafka 
comment is referring to using the kafka record timestamp to fill in place of a 
timestamp in place of either of those other 2 input formats not providing a 
timestamp).
   
   Also, nit, these comment lines are too long



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to