ArkoSharma commented on a change in pull request #1936:
URL: https://github.com/apache/hive/pull/1936#discussion_r580050377
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
##########
@@ -18,158 +18,176 @@
package org.apache.hadoop.hive.ql.exec.repl.util;
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Callable;
-
-/**
- * A file backed list of Strings which is in-memory till the threshold.
- */
public class FileList implements AutoCloseable, Iterator<String> {
private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
- private static int fileListStreamerID = 0;
- private static final String FILE_LIST_STREAMER_PREFIX =
"file-list-streamer-";
-
- private LinkedBlockingQueue<String> cache;
- private volatile boolean thresholdHit = false;
- private int thresholdPoint;
- private float thresholdFactor = 0.9f;
- private Path backingFile;
- private FileListStreamer fileListStreamer;
- private String nextElement;
- private boolean noMoreElement;
+ private final Path backingFile;
+ private String nextElement = null;
private HiveConf conf;
+ private volatile boolean retryMode;
private BufferedReader backingFileReader;
+ private volatile FSDataOutputStream backingFileWriter;
-
- public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+ public FileList(Path backingFile, HiveConf conf) {
this.backingFile = backingFile;
this.conf = conf;
- if (cacheSize > 0) {
- // Cache size must be > 0 for this list to be used for the write
operation.
- this.cache = new LinkedBlockingQueue<>(cacheSize);
- fileListStreamer = new FileListStreamer(cache, backingFile, conf);
- thresholdPoint = getThreshold(cacheSize);
- LOG.debug("File list backed by {} can be used for write operation.",
backingFile);
- } else {
- thresholdHit = true;
+ this.retryMode = false;
+ }
+
+ public void add(String entry) throws IOException {
+ Retryable retryable = buildRetryable();
+ try {
+ retryable.executeCallable((Callable<Void>) ()-> {
+ synchronized (backingFile) {
+ try{
+ if (backingFileWriter == null) {
+ backingFileWriter = initWriter();
+ }
+ backingFileWriter.writeBytes(getEntryWithNewline(entry));
+ backingFileWriter.hflush();
+ LOG.info("Writing entry {} to file list backed by {}", entry,
backingFile);
+ } catch (IOException e) {
+ LOG.error("Writing entry {} to file list {} failed, attempting
retry.", entry, backingFile, e);
+ this.retryMode = true;
+ close();
+ throw e;
+ }
+ }
+ return null;
+ });
+ } catch (Exception e) {
+ throw new
IOException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()));
}
}
- @VisibleForTesting
- FileList(Path backingFile, FileListStreamer fileListStreamer,
LinkedBlockingQueue<String> cache, HiveConf conf) {
- this.backingFile = backingFile;
- this.fileListStreamer = fileListStreamer;
- this.cache = cache;
- this.conf = conf;
- thresholdPoint = getThreshold(cache.remainingCapacity());
+ Retryable buildRetryable() {
+ return Retryable.builder()
+ .withHiveConf(conf)
+ .withRetryOnException(IOException.class).build();
}
- /**
- * Only add operation is safe for concurrent operations.
- */
- public void add(String entry) throws SemanticException {
- if (thresholdHit && !fileListStreamer.isAlive()) {
- throw new SemanticException("List is not getting saved anymore to file "
+ backingFile.toString());
+ // Return the entry ensuring it ends with newline.
+ private String getEntryWithNewline(String entry) {
+ return new StringWriter()
+ .append(entry)
+ .append(System.lineSeparator())
+ .toString();
+ }
+
+ FSDataOutputStream initWriter() throws IOException {
+ if(shouldAppend()) {
+ return getWriterAppendMode(); // append in retry-mode if file has been
created already
+ }
+ else {
+ return getWriterCreateMode();
}
+ }
+
+ boolean shouldAppend() throws IOException {
+ return backingFile.getFileSystem(conf).exists(backingFile) &&
this.retryMode;
Review comment:
In case of bootstrap checkpointing there could be an earlier file lying
around from the previous attempt. We rewrite this file in that case.
To distinguish that case from an ongoing retry in the current dump, this
flag is introduced.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]