ngsg commented on code in PR #5983: URL: https://github.com/apache/hive/pull/5983#discussion_r2212455123
########## shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java: ########## @@ -148,13 +150,32 @@ public boolean next(K key, V value) throws IOException { @Override public K createKey() { - K newKey = curReader.createKey(); - return (K)(new CombineHiveKey(newKey)); + skipCorruptfile = jc.getBoolean("hive.exec.skip.protobuf.corruptfile", false); + K newKey = null; + if (skipCorruptfile) { Review Comment: Could we check `skipCorruptFile` inside the catch clause like this? https://github.com/apache/hive/blob/5512ffda602bef605ad144f9040676149cc2c960/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java#L2077-L2083 ########## shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java: ########## @@ -249,26 +270,49 @@ protected boolean initNextRecordReader(K key) throws IOException { return false; } - // get a record reader for the idx-th chunk - try { - curReader = rrConstructor.newInstance(new Object[] - {split, jc, reporter, Integer.valueOf(idx), preReader}); - - // change the key if need be - if (key != null) { - K newKey = curReader.createKey(); - ((CombineHiveKey)key).setKey(newKey); + if (skipCorruptfile) { + // get a record reader for the idx-th chunk + try { + curReader = rrConstructor.newInstance(new Object[] + {split, jc, reporter, Integer.valueOf(idx), preReader}); + + // change the key if need be + if (key != null) { + K newKey = curReader.createKey(); + ((CombineHiveKey) key).setKey(newKey); + } + + // setup some helper config variables. + jc.set("map.input.file", split.getPath(idx).toString()); + jc.setLong("map.input.start", split.getOffset(idx)); + jc.setLong("map.input.length", split.getLength(idx)); + } catch (InvocationTargetException ITe) { + return false; + } catch (Exception e) { + curReader = HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(e, jc); Review Comment: It appears that Hive already provides a way to recover from exceptions thrown by RR via `HiveIOExceptionHandlerUtil#handleRecordReaderCreationException`. Could this be a solution to your issue? ########## shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java: ########## @@ -249,26 +270,49 @@ protected boolean initNextRecordReader(K key) throws IOException { return false; } - // get a record reader for the idx-th chunk - try { - curReader = rrConstructor.newInstance(new Object[] - {split, jc, reporter, Integer.valueOf(idx), preReader}); - - // change the key if need be - if (key != null) { - K newKey = curReader.createKey(); - ((CombineHiveKey)key).setKey(newKey); + if (skipCorruptfile) { + // get a record reader for the idx-th chunk + try { + curReader = rrConstructor.newInstance(new Object[] + {split, jc, reporter, Integer.valueOf(idx), preReader}); + + // change the key if need be + if (key != null) { + K newKey = curReader.createKey(); + ((CombineHiveKey) key).setKey(newKey); + } + + // setup some helper config variables. + jc.set("map.input.file", split.getPath(idx).toString()); + jc.setLong("map.input.start", split.getOffset(idx)); + jc.setLong("map.input.length", split.getLength(idx)); + } catch (InvocationTargetException ITe) { + return false; + } catch (Exception e) { + curReader = HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(e, jc); } - - // setup some helper config variables. - jc.set("map.input.file", split.getPath(idx).toString()); - jc.setLong("map.input.start", split.getOffset(idx)); - jc.setLong("map.input.length", split.getLength(idx)); - } catch (Exception e) { - curReader = HiveIOExceptionHandlerUtil.handleRecordReaderCreationException( - e, jc); + } else { + // get a record reader for the idx-th chunk + try { + curReader = rrConstructor.newInstance(new Object[] + {split, jc, reporter, Integer.valueOf(idx), preReader}); + + // change the key if need be + if (key != null) { + K newKey = curReader.createKey(); + ((CombineHiveKey) key).setKey(newKey); + } + + // setup some helper config variables. + jc.set("map.input.file", split.getPath(idx).toString()); + jc.setLong("map.input.start", split.getOffset(idx)); + jc.setLong("map.input.length", split.getLength(idx)); + } catch (Exception e) { + curReader = HiveIOExceptionHandlerUtil.handleRecordReaderCreationException( + e, jc); + } + idx++; Review Comment: It seems that the skipCorruptFile-enabled path does not properly update `idx`. Could you check this? ########## shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java: ########## @@ -148,13 +150,32 @@ public boolean next(K key, V value) throws IOException { @Override public K createKey() { - K newKey = curReader.createKey(); - return (K)(new CombineHiveKey(newKey)); + skipCorruptfile = jc.getBoolean("hive.exec.skip.protobuf.corruptfile", false); + K newKey = null; + if (skipCorruptfile) { + try { + newKey = curReader.createKey(); + } catch (NullPointerException e) { + LOG.info("=================Caught exception: " + e.getMessage() + ", skipping file======================"); Review Comment: I don't think we need the extra '=' characters in the log message. Could you simplify the message? Also, there's no corresponding log message in `createValue()`. I think these methods should either both emit logs or neither, for consistency. ########## common/src/java/org/apache/hadoop/hive/conf/HiveConf.java: ########## @@ -814,6 +814,10 @@ public static enum ConfVars { "Whether to use a native APIs for load queries to non-native table(like iceberg), if false uses a Tez job for" + " load queries"), + HIVE_SKIP_PROTOBUF_CORRUPTFILE("hive.exec.skip.protobuf.corruptfile", false, Review Comment: - Could we start the config key with `hive.io` or something other than `hive.exec`? Since this does not affect the hive-exec module, I think the key should not start with `hive.exec`. - We probably don't need the '\n' at the end of the line. Also, could we change it to use 4-space indentation, like this? ```suggestion HIVE_SKIP_PROTOBUF_CORRUPTFILE("hive.exec.skip.protobuf.corruptfile", false, "Whether Hive skips Protocol Buffer files corrupted, " + "during parsing (e.g., due to data truncation or file incompletion) to allow task continuation."), ``` ########## shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java: ########## @@ -249,26 +270,49 @@ protected boolean initNextRecordReader(K key) throws IOException { return false; } - // get a record reader for the idx-th chunk - try { - curReader = rrConstructor.newInstance(new Object[] - {split, jc, reporter, Integer.valueOf(idx), preReader}); - - // change the key if need be - if (key != null) { - K newKey = curReader.createKey(); - ((CombineHiveKey)key).setKey(newKey); + if (skipCorruptfile) { + // get a record reader for the idx-th chunk + try { + curReader = rrConstructor.newInstance(new Object[] + {split, jc, reporter, Integer.valueOf(idx), preReader}); + + // change the key if need be + if (key != null) { + K newKey = curReader.createKey(); + ((CombineHiveKey) key).setKey(newKey); + } + + // setup some helper config variables. + jc.set("map.input.file", split.getPath(idx).toString()); + jc.setLong("map.input.start", split.getOffset(idx)); + jc.setLong("map.input.length", split.getLength(idx)); + } catch (InvocationTargetException ITe) { Review Comment: - Could we use lowercase for `ITe`? - It looks like we skip all remaining splits once we encounter an `InvocationTargetException`. Shouldn't we proceed with them instead of just returning false? ########## shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java: ########## @@ -148,13 +150,32 @@ public boolean next(K key, V value) throws IOException { @Override public K createKey() { - K newKey = curReader.createKey(); - return (K)(new CombineHiveKey(newKey)); + skipCorruptfile = jc.getBoolean("hive.exec.skip.protobuf.corruptfile", false); + K newKey = null; + if (skipCorruptfile) { + try { + newKey = curReader.createKey(); + } catch (NullPointerException e) { Review Comment: I'm not sure about handling `NullPointerException` at this point. Can we say with certainty that a `NullPointerException` always results from a corrupted file? If not, I don't think we should swallow it. ########## shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java: ########## @@ -132,6 +133,7 @@ public static class CombineFileRecordReader<K, V> implements RecordReader<K, V> protected RecordReader<K, V> curReader; protected boolean isShrinked; protected long shrinkedLength; + protected boolean skipCorruptfile; Review Comment: Could we change this to `private final skipCorruptFile`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org