lirui-apache commented on a change in pull request #9721: [FLINK-14129][hive]
HiveTableSource should implement ProjectableTable…
URL: https://github.com/apache/flink/pull/9721#discussion_r326846024
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableInputFormat.java
##########
@@ -203,30 +210,40 @@ protected void fetchNext() throws IOException {
}
@Override
- public Row nextRecord(Row ignore) throws IOException {
+ public Row nextRecord(Row reuse) throws IOException {
if (reachedEnd()) {
return null;
}
- Row row = new Row(rowArity);
try {
//Use HiveDeserializer to deserialize an object out of
a Writable blob
Object hiveRowStruct = deserializer.deserialize(value);
- int index = 0;
- for (; index < structFields.size(); index++) {
- StructField structField =
structFields.get(index);
- Object object =
HiveInspectors.toFlinkObject(structField.getFieldObjectInspector(),
-
structObjectInspector.getStructFieldData(hiveRowStruct, structField));
- row.setField(index, object);
- }
- for (String partition : partitionColNames){
- row.setField(index++,
hiveTablePartition.getPartitionSpec().get(partition));
+ for (int i = 0; i < fields.length; i++) {
+ // set non-partition columns
+ if (fields[i] < structFields.size()) {
+ StructField structField =
structFields.get(fields[i]);
+ Object object =
HiveInspectors.toFlinkObject(structField.getFieldObjectInspector(),
+
structObjectInspector.getStructFieldData(hiveRowStruct, structField));
+ reuse.setField(i, object);
+ }
}
- } catch (Exception e){
+ } catch (Exception e) {
logger.error("Error happens when converting hive data
type to flink data type.");
throw new FlinkHiveException(e);
}
+ if (!rowReused) {
Review comment:
> it's always triggered for the first row, but still have to be checked for
all other remaining rows regardless?
Yes. But it's still slightly better than populating each partition columns
for a row.
> maybe we can add a Row member variable to HiveTableInputFormet, and
precompute and set partition values for it based on projection, and just reuse
that row in nextRecord()?
That's possible. My concern is we don't have guarantee that the fields of
the row we return in `nextRecord` won't be manipulated by the framework or
downstream operators. Although both solutions have this problem, the current
implementation is a little bit more "compliant" with the interface contract --
that the Row parameter is the instance to be reused.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services