lsyldliu commented on code in PR #26633:
URL: https://github.com/apache/flink/pull/26633#discussion_r2128655996
##########
flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java:
##########
@@ -196,16 +196,40 @@ private void testResumeAfterMultiplePersist(
final Map<String, RecoverableWriter.ResumeRecoverable> recoverables =
new HashMap<>(4);
RecoverableFsDataOutputStream stream = null;
try {
- stream = initWriter.open(path);
- recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
-
- stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+ try {
Review Comment:
I think we should simplify the print log logic, only print it the catch
block as following:
```
// This is just for locate the root cause:
// https://issues.apache.org/jira/browse/FLINK-37703
// After the fix, this logic should be reverted.
int branch = 0;
try {
branch++;
stream = initWriter.open(path);
branch++;
recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
branch++;
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
branch++;
recoverables.put(INTERM_WITH_STATE_PERSIST, stream.persist());
branch++;
recoverables.put(INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST,
stream.persist());
// and write some more data
branch++;
stream.write(testData2.getBytes(StandardCharsets.UTF_8));
branch++;
recoverables.put(FINAL_WITH_EXTRA_STATE, stream.persist());
} catch (IOException e) {
LOG.info(
"The exception branch was: {}, detail exception msg: {}",
branch,
e.getMessage());
throw e;
} finally {
```
##########
flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java:
##########
@@ -196,16 +196,40 @@ private void testResumeAfterMultiplePersist(
final Map<String, RecoverableWriter.ResumeRecoverable> recoverables =
new HashMap<>(4);
RecoverableFsDataOutputStream stream = null;
try {
- stream = initWriter.open(path);
- recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
-
- stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+ try {
+ stream = initWriter.open(path);
+ recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
+ } catch (IOException e) {
+ System.err.println("Unable to open file for writing " +
path.toString());
+ throw e;
+ }
- recoverables.put(INTERM_WITH_STATE_PERSIST, stream.persist());
- recoverables.put(INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST,
stream.persist());
+ try {
+ stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+ } catch (IOException e) {
+ System.err.println("Initial write failed: " + e.getMessage());
Review Comment:
Can you use `LOG.info` to print the message?
##########
flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java:
##########
@@ -196,16 +196,40 @@ private void testResumeAfterMultiplePersist(
final Map<String, RecoverableWriter.ResumeRecoverable> recoverables =
new HashMap<>(4);
RecoverableFsDataOutputStream stream = null;
try {
- stream = initWriter.open(path);
- recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
-
- stream.write(testData1.getBytes(StandardCharsets.UTF_8));
+ try {
+ stream = initWriter.open(path);
+ recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
+ } catch (IOException e) {
+ System.err.println("Unable to open file for writing " +
path.toString());
+ throw e;
Review Comment:
```
Apr 28 12:19:16 java.io.IOException: All datanodes
[DatanodeInfoWithStorage[127.0.0.1:46278,DS-26d47d25-42de-4eef-a409-8a700a8bc82a,DISK]]
are bad. Aborting...
Apr 28 12:19:16 at
org.apache.hadoop.hdfs.DataStreamer.handleBadDatanode(DataStreamer.java:1537)
Apr 28 12:19:16 at
org.apache.hadoop.hdfs.DataStreamer.setupPipelineForAppendOrRecovery(DataStreamer.java:1472)
Apr 28 12:19:16 at
org.apache.hadoop.hdfs.DataStreamer.processDatanodeError(DataStreamer.java:1244)
Apr 28 12:19:16 at
org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:663)
```
Based on the original error message, we need to locate why the HDFS DataNode
node is broken, can throwing an exception on the client side find the root
cause? I don't know much about HDFS, so I'm not sure. Do we need to turn on the
logging on the Server side when we pull up the HDFS cluster and observe the
behavior on the Server side?
##########
flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java:
##########
@@ -236,9 +260,13 @@ private void testResumeAfterMultiplePersist(
assertThat(fileContents.getKey().getName()).startsWith(".part-0.inprogress.");
assertThat(fileContents.getValue()).isEqualTo(expectedPostRecoveryContents);
}
-
- recoveredStream.write(testData3.getBytes(StandardCharsets.UTF_8));
- recoveredStream.closeForCommit().commit();
+ try {
Review Comment:
ditto.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]