bowenli86 commented on a change in pull request #9721: [FLINK-14129][hive]
HiveTableSource should implement ProjectableTable…
URL: https://github.com/apache/flink/pull/9721#discussion_r326790235
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableInputFormat.java
##########
@@ -203,30 +210,40 @@ protected void fetchNext() throws IOException {
}
@Override
- public Row nextRecord(Row ignore) throws IOException {
+ public Row nextRecord(Row reuse) throws IOException {
if (reachedEnd()) {
return null;
}
- Row row = new Row(rowArity);
try {
//Use HiveDeserializer to deserialize an object out of
a Writable blob
Object hiveRowStruct = deserializer.deserialize(value);
- int index = 0;
- for (; index < structFields.size(); index++) {
- StructField structField =
structFields.get(index);
- Object object =
HiveInspectors.toFlinkObject(structField.getFieldObjectInspector(),
-
structObjectInspector.getStructFieldData(hiveRowStruct, structField));
- row.setField(index, object);
- }
- for (String partition : partitionColNames){
- row.setField(index++,
hiveTablePartition.getPartitionSpec().get(partition));
+ for (int i = 0; i < fields.length; i++) {
+ // set non-partition columns
+ if (fields[i] < structFields.size()) {
+ StructField structField =
structFields.get(fields[i]);
+ Object object =
HiveInspectors.toFlinkObject(structField.getFieldObjectInspector(),
+
structObjectInspector.getStructFieldData(hiveRowStruct, structField));
+ reuse.setField(i, object);
+ }
}
- } catch (Exception e){
+ } catch (Exception e) {
logger.error("Error happens when converting hive data
type to flink data type.");
throw new FlinkHiveException(e);
}
+ if (!rowReused) {
Review comment:
It seems to be initialization logic to me, as it's always triggered for the
first row, but still have to be checked for all other remaining rows regardless?
maybe we can add a `Row` member variable to HiveTableInputFormet, and
precompute and set partition values for it based on projection, and just reuse
that row in `nextRecord()`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services