Github user uybhatti commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4638#discussion_r139665963
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
    @@ -106,8 +240,191 @@
                return deserializationSchema;
        }
     
    -   @Override
    -   public String explainSource() {
    -           return "";
    +   /**
    +    * Assigns ingestion time timestamps and watermarks.
    +    */
    +   public static class IngestionTimeWatermarkAssigner implements 
AssignerWithPeriodicWatermarks<Row> {
    +
    +           private long curTime = Long.MIN_VALUE;
    +
    +           @Override
    +           public long extractTimestamp(Row element, long 
previousElementTimestamp) {
    +                   long t = System.currentTimeMillis();
    +                   if (t > curTime) {
    +                           curTime = t;
    +                   }
    +                   return curTime;
    +           }
    +
    +           @Nullable
    +           @Override
    +           public Watermark getCurrentWatermark() {
    +                   return new Watermark(curTime - 1);
    +           }
    +   }
    +
    +   protected AssignerWithPeriodicWatermarks<Row> getAssigner() {
    +           return this.timestampAssigner;
    +   }
    +
    +   /**
    +    * Checks that the provided row time attribute is valid, determines its 
position in the schema,
    +    * and adjusts the return type.
    +    *
    +    * @param rowtime The attribute to check.
    +    */
    +   private void configureRowTimeAttribute(String rowtime) {
    +           Preconditions.checkNotNull(rowtime, "Row time attribute must 
not be null.");
    +
    +           if (this.ingestionTimeAttribute != null) {
    +                   throw new ValidationException(
    +                           "You can only specify a row time attribute OR 
an ingestion time attribute.");
    +           }
    +
    +           if (this.rowTimeAttribute != null) {
    +                   throw new ValidationException(
    +                           "Row time attribute can only be specified 
once.");
    +           }
    +
    +           // get current fields
    +           String[] fieldNames = ((RowTypeInfo) 
this.getReturnType()).getFieldNames();
    +           TypeInformation[] fieldTypes = ((RowTypeInfo) 
this.getReturnType()).getFieldTypes();
    +
    +           // check if the rowtime field exists and remember position
    +           this.rowtimeFieldPos = -1;
    +           for (int i = 0; i < fieldNames.length; i++) {
    +                   if (fieldNames[i].equals(rowtime)) {
    +                           if (fieldTypes[i] != Types.LONG) {
    +                                   throw new 
IllegalArgumentException("Specified rowtime field must be of type BIGINT. " +
    +                                           "Available fields: " + 
toSchemaString(fieldNames, fieldTypes));
    +                           }
    +                           this.rowtimeFieldPos = i;
    +                           break;
    +                   }
    +           }
    +           if (this.rowtimeFieldPos < 0) {
    +                   throw new IllegalArgumentException("Specified rowtime 
field must be present in data. " +
    +                           "Available fields: " + 
toSchemaString(fieldNames, fieldTypes));
    +           }
    +           this.rowTimeAttribute = rowtime;
    +
    +           // adjust result type by removing rowtime field (will be added 
later)
    +           String[] newNames = new String[fieldNames.length - 1];
    +           TypeInformation[] newTypes = new 
TypeInformation[fieldTypes.length - 1];
    +           for (int i = 0; i < rowtimeFieldPos; i++) {
    +                   newNames[i] = fieldNames[i];
    +                   newTypes[i] = fieldTypes[i];
    +           }
    +           for (int i = rowtimeFieldPos + 1; i < fieldNames.length; i++) {
    +                   newNames[i - 1] = fieldNames[i];
    +                   newTypes[i - 1] = fieldTypes[i];
    +           }
    +           this.typeInfo = new RowTypeInfo(newTypes, newNames);
    +   }
    +
    +   /**
    +    * Util method to create a schema description.
    +    *
    +    * @param fieldNames The names of the fields.
    +    * @param fieldTypes The types of the fields.
    +    * @return A string describing the schema of the given field 
information.
    +    */
    +   private String toSchemaString(String[] fieldNames, TypeInformation[] 
fieldTypes) {
    +           Preconditions.checkArgument(fieldNames.length == 
fieldTypes.length);
    +
    +           StringBuilder sb = new StringBuilder("[");
    +           for (int i = 0; i < fieldNames.length - 1; i++) {
    +                   sb.append(fieldNames[i]);
    +                   sb.append(": ");
    +                   if (fieldTypes[i] == Types.BOOLEAN) {
    +                           sb.append("BOOLEAN");
    +                   } else if (fieldTypes[i] == Types.BYTE) {
    +                           sb.append("TINYINT");
    +                   } else if (fieldTypes[i] == Types.SHORT) {
    +                           sb.append("SMALLINT");
    +                   } else if (fieldTypes[i] == Types.INT) {
    +                           sb.append("INTEGER");
    +                   } else if (fieldTypes[i] == Types.LONG) {
    +                           sb.append("BIGINT");
    +                   } else if (fieldTypes[i] == Types.FLOAT) {
    +                           sb.append("FLOAT");
    +                   } else if (fieldTypes[i] == Types.DOUBLE) {
    +                           sb.append("DOUBLE");
    +                   } else if (fieldTypes[i] == Types.STRING) {
    +                           sb.append("VARCHAR");
    +                   } else if (fieldTypes[i] == Types.DECIMAL) {
    +                           sb.append("DECIMAL");
    +                   } else if (fieldTypes[i] == Types.SQL_DATE) {
    +                           sb.append("DATE");
    +                   } else if (fieldTypes[i] == Types.SQL_TIME) {
    +                           sb.append("TIME");
    +                   } else if (fieldTypes[i] == Types.SQL_TIMESTAMP) {
    +                           sb.append("TIMESTAMP");
    +                   } else {
    +                           
sb.append(fieldTypes[i].getTypeClass().getSimpleName());
    +                   }
    +                   sb.append(", ");
    +           }
    +           sb.delete(sb.length() - 2, sb.length());
    +           sb.append("]");
    +           return sb.toString();
    +   }
    +
    +   /**
    +    * Assigns an existing field as timestamp and generates bounded 
out-of-order watermarks.
    +    */
    +   public static class RowFieldWatermarkAssigner implements 
AssignerWithPeriodicWatermarks<Row> {
    +
    +           private final int timeField;
    +           private final long delayMs;
    +           private long maxTime = Long.MIN_VALUE;
    +
    +           private RowFieldWatermarkAssigner(int timeFieldPos, long 
delayMs) {
    +                   Preconditions.checkArgument(delayMs >= 0, "Watermark 
delay must be positive.");
    +                   this.timeField = timeFieldPos;
    +                   this.delayMs = delayMs;
    +           }
    +
    +           @Override
    +           public long extractTimestamp(Row row, long 
previousElementTimestamp) {
    +                   long t = (long) row.getField(timeField);
    +                   if (t > maxTime) {
    +                           maxTime = t;
    +                   }
    +                   return t;
    +           }
    +
    +           @Nullable
    +           @Override
    +           public Watermark getCurrentWatermark() {
    +                   return new Watermark(maxTime - delayMs);
    +           }
        }
    +
    +   /**
    +    * Removes a field from a Row.
    +    */
    +   public static class FieldRemover implements MapFunction<Row, Row> {
    +
    +           private int fieldToRemove;
    +
    +           private FieldRemover(int fieldToRemove) {
    +                   this.fieldToRemove = fieldToRemove;
    +           }
    +
    +           @Override
    +           public Row map(Row value) throws Exception {
    +
    +                   Row out = new Row(value.getArity());
    --- End diff --
    
    it should be `Row out = new Row(value.getArity() - 1)` because you are 
removing one field from a row.


---

Reply via email to