This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 32c9cad [HUDI-840] Avoid blank file created by HoodieLogFormatWriter
(#1567)
32c9cad is described below
commit 32c9cad52c7eef1067f12867afc405a1624cef14
Author: hongdd <[email protected]>
AuthorDate: Tue Sep 29 23:02:15 2020 +0800
[HUDI-840] Avoid blank file created by HoodieLogFormatWriter (#1567)
---
.../common/table/log/HoodieLogFormatWriter.java | 121 ++++++++++++---------
.../common/functional/TestHoodieLogFormat.java | 9 +-
.../TestHoodieLogFormatAppendFailure.java | 6 +
3 files changed, 83 insertions(+), 53 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index d9c23b6..8909477 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -54,6 +54,7 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
private final String logWriteToken;
private final String rolloverLogWriteToken;
private FSDataOutputStream output;
+ private boolean closed = false;
private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not
sufficiently replicated yet";
/**
@@ -64,7 +65,7 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
* @param sizeThreshold
*/
HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer
bufferSize, Short replication, Long sizeThreshold,
- String logWriteToken, String rolloverLogWriteToken) throws IOException,
InterruptedException {
+ String logWriteToken, String rolloverLogWriteToken) {
this.fs = fs;
this.logFile = logFile;
this.sizeThreshold = sizeThreshold;
@@ -73,40 +74,6 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
this.logWriteToken = logWriteToken;
this.rolloverLogWriteToken = rolloverLogWriteToken;
addShutDownHook();
- Path path = logFile.getPath();
- if (fs.exists(path)) {
- boolean isAppendSupported =
StorageSchemes.isAppendSupported(fs.getScheme());
- if (isAppendSupported) {
- LOG.info(logFile + " exists. Appending to existing file");
- try {
- this.output = fs.append(path, bufferSize);
- } catch (RemoteException e) {
- LOG.warn("Remote Exception, attempting to handle or recover lease",
e);
- handleAppendExceptionOrRecoverLease(path, e);
- } catch (IOException ioe) {
- if (ioe.getMessage().toLowerCase().contains("not supported")) {
- // may still happen if scheme is viewfs.
- isAppendSupported = false;
- } else {
- /*
- * Before throwing an exception, close the outputstream,
- * to ensure that the lease on the log file is released.
- */
- close();
- throw ioe;
- }
- }
- }
- if (!isAppendSupported) {
- this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
- LOG.info("Append not supported.. Rolling over to " + logFile);
- createNewFile();
- }
- } else {
- LOG.info(logFile + " does not exist. Create a new file");
- // Block size does not matter as we will always manually autoflush
- createNewFile();
- }
}
public FileSystem getFs() {
@@ -122,16 +89,64 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
return sizeThreshold;
}
+ /**
+ * Lazily opens the output stream if needed for writing.
+ * @return OutputStream for writing to current log file.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private FSDataOutputStream getOutputStream() throws IOException,
InterruptedException {
+ if (this.output == null) {
+ Path path = logFile.getPath();
+ if (fs.exists(path)) {
+ boolean isAppendSupported =
StorageSchemes.isAppendSupported(fs.getScheme());
+ if (isAppendSupported) {
+ LOG.info(logFile + " exists. Appending to existing file");
+ try {
+ this.output = fs.append(path, bufferSize);
+ } catch (RemoteException e) {
+ LOG.warn("Remote Exception, attempting to handle or recover
lease", e);
+ handleAppendExceptionOrRecoverLease(path, e);
+ } catch (IOException ioe) {
+ if (ioe.getMessage().toLowerCase().contains("not supported")) {
+ // may still happen if scheme is viewfs.
+ isAppendSupported = false;
+ } else {
+ /*
+ * Before throwing an exception, close the outputstream,
+ * to ensure that the lease on the log file is released.
+ */
+ close();
+ throw ioe;
+ }
+ }
+ }
+ if (!isAppendSupported) {
+ this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
+ LOG.info("Append not supported.. Rolling over to " + logFile);
+ createNewFile();
+ }
+ } else {
+ LOG.info(logFile + " does not exist. Create a new file");
+ // Block size does not matter as we will always manually autoflush
+ createNewFile();
+ }
+ }
+ return output;
+ }
+
@Override
public Writer appendBlock(HoodieLogBlock block) throws IOException,
InterruptedException {
// Find current version
HoodieLogFormat.LogFormatVersion currentLogFormatVersion =
new HoodieLogFormatVersion(HoodieLogFormat.CURRENT_VERSION);
- long currentSize = this.output.size();
+
+ FSDataOutputStream outputStream = getOutputStream();
+ long currentSize = outputStream.size();
// 1. Write the magic header for the start of the block
- this.output.write(HoodieLogFormat.MAGIC);
+ outputStream.write(HoodieLogFormat.MAGIC);
// bytes for header
byte[] headerBytes =
HoodieLogBlock.getLogMetadataBytes(block.getLogBlockHeader());
@@ -141,27 +156,27 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
byte[] footerBytes =
HoodieLogBlock.getLogMetadataBytes(block.getLogBlockFooter());
// 2. Write the total size of the block (excluding Magic)
- this.output.writeLong(getLogBlockLength(content.length,
headerBytes.length, footerBytes.length));
+ outputStream.writeLong(getLogBlockLength(content.length,
headerBytes.length, footerBytes.length));
// 3. Write the version of this log block
- this.output.writeInt(currentLogFormatVersion.getVersion());
+ outputStream.writeInt(currentLogFormatVersion.getVersion());
// 4. Write the block type
- this.output.writeInt(block.getBlockType().ordinal());
+ outputStream.writeInt(block.getBlockType().ordinal());
// 5. Write the headers for the log block
- this.output.write(headerBytes);
+ outputStream.write(headerBytes);
// 6. Write the size of the content block
- this.output.writeLong(content.length);
+ outputStream.writeLong(content.length);
// 7. Write the contents of the data block
- this.output.write(content);
+ outputStream.write(content);
// 8. Write the footers for the log block
- this.output.write(footerBytes);
+ outputStream.write(footerBytes);
// 9. Write the total size of the log block (including magic) which is
everything written
// until now (for reverse pointer)
// Update: this information is now used in determining if a block is
corrupt by comparing to the
// block size in header. This change assumes that the block size will be
the last data written
// to a block. Read will break if any data is written past this point
for a block.
- this.output.writeLong(this.output.size() - currentSize);
+ outputStream.writeLong(outputStream.size() - currentSize);
// Flush every block to disk
flush();
@@ -207,9 +222,12 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
@Override
public void close() throws IOException {
- flush();
- output.close();
- output = null;
+ if (output != null) {
+ flush();
+ output.close();
+ output = null;
+ closed = true;
+ }
}
private void flush() throws IOException {
@@ -224,9 +242,13 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
@Override
public long getCurrentSize() throws IOException {
- if (output == null) {
+ if (closed) {
throw new IllegalStateException("Cannot get current size as the
underlying stream has been closed already");
}
+
+ if (output == null) {
+ return 0;
+ }
return output.getPos();
}
@@ -302,5 +324,4 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
}
}
}
-
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 1d7ca97..fa25e17 100755
---
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -145,6 +145,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
assertEquals(size,
fs.getFileStatus(writer.getLogFile().getPath()).getLen(),
"Write should be auto-flushed. The size reported by FileStatus and the
writer should match");
writer.close();
+
}
@ParameterizedTest
@@ -174,6 +175,8 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
writer = writer.appendBlock(dataBlock);
assertEquals(0, writer.getCurrentSize(), "This should be a new log file
and hence size should be 0");
assertEquals(2, writer.getLogFile().getLogVersion(), "Version should be
rolled to 2");
+ Path logFilePath = writer.getLogFile().getPath();
+ assertFalse(fs.exists(logFilePath), "Path (" + logFilePath + ") must not
exist");
writer.close();
}
@@ -216,16 +219,16 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
builder1 =
builder1.withLogVersion(1).withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN);
}
Writer writer = builder1.build();
- Writer writer2 = builder2.build();
- HoodieLogFile logFile1 = writer.getLogFile();
- HoodieLogFile logFile2 = writer2.getLogFile();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
getSimpleSchema().toString());
HoodieDataBlock dataBlock = getDataBlock(records, header);
writer = writer.appendBlock(dataBlock);
+ Writer writer2 = builder2.build();
writer2 = writer2.appendBlock(dataBlock);
+ HoodieLogFile logFile1 = writer.getLogFile();
+ HoodieLogFile logFile2 = writer2.getLogFile();
writer.close();
writer2.close();
assertNotNull(logFile1.getLogWriteToken());
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
index 201ed4f..71616f6 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil;
@@ -139,6 +140,11 @@ public class TestHoodieLogFormatAppendFailure {
writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits.archive")
.overBaseCommit("").withFs(fs).build();
+ header = new HashMap<>();
+ header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
+
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
+
+ writer.appendBlock(new HoodieCommandBlock(header));
// The log version should be different for this new writer
assertNotEquals(writer.getLogFile().getLogVersion(), logFileVersion);
}