This is an automated email from the ASF dual-hosted git repository.

adamjshook pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 23cabbc7a487b8844cbbe48a89a26741b7a696d8
Merge: 2a58165 927e673
Author: Adam J. Shook <adamjsh...@gmail.com>
AuthorDate: Wed Feb 14 14:26:12 2018 -0500

    Merge branch '1.8'

 .../org/apache/accumulo/tserver/log/DfsLogger.java |   5 +-
 .../org/apache/accumulo/tserver/log/LogSorter.java |  68 ++---
 .../apache/accumulo/tserver/logger/LogReader.java  |  43 +--
 .../tserver/replication/AccumuloReplicaSystem.java | 297 ++++++++++-----------
 4 files changed, 207 insertions(+), 206 deletions(-)

diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index fc72b98,ba5e488..c35a315
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@@ -113,44 -113,46 +113,46 @@@ public class LogSorter 
          // the following call does not throw an exception if the file/dir 
does not exist
          fs.deleteRecursively(new Path(destPath));
  
-         DFSLoggerInputStreams inputStreams;
-         try {
-           inputStreams = DfsLogger.readHeaderAndReturnStream(fs, srcPath, 
conf);
-         } catch (LogHeaderIncompleteException e) {
-           log.warn("Could not read header from write-ahead log {}. Not 
sorting.", srcPath);
-           // Creating a 'finished' marker will cause recovery to proceed 
normally and the
-           // empty file will be correctly ignored downstream.
-           fs.mkdirs(new Path(destPath));
-           writeBuffer(destPath, Collections.<Pair<LogFileKey,LogFileValue>> 
emptyList(), part++);
-           fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
-           return;
-         }
- 
-         this.input = inputStreams.getOriginalInput();
-         this.decryptingInput = inputStreams.getDecryptingInputStream();
- 
-         final long bufferSize = 
conf.getAsBytes(Property.TSERV_SORT_BUFFER_SIZE);
-         Thread.currentThread().setName("Sorting " + name + " for recovery");
-         while (true) {
-           final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new 
ArrayList<>();
+         try (final FSDataInputStream fsinput = fs.open(srcPath)) {
+           DFSLoggerInputStreams inputStreams;
            try {
-             long start = input.getPos();
-             while (input.getPos() - start < bufferSize) {
-               LogFileKey key = new LogFileKey();
-               LogFileValue value = new LogFileValue();
-               key.readFields(decryptingInput);
-               value.readFields(decryptingInput);
-               buffer.add(new Pair<>(key, value));
+             inputStreams = DfsLogger.readHeaderAndReturnStream(fsinput, conf);
+           } catch (LogHeaderIncompleteException e) {
 -            log.warn("Could not read header from write-ahead log " + srcPath 
+ ". Not sorting.");
++            log.warn("Could not read header from write-ahead log {}. Not 
sorting.", srcPath);
+             // Creating a 'finished' marker will cause recovery to proceed 
normally and the
+             // empty file will be correctly ignored downstream.
+             fs.mkdirs(new Path(destPath));
+             writeBuffer(destPath, Collections.<Pair<LogFileKey,LogFileValue>> 
emptyList(), part++);
+             fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
+             return;
+           }
+ 
+           this.input = inputStreams.getOriginalInput();
+           this.decryptingInput = inputStreams.getDecryptingInputStream();
+ 
 -          final long bufferSize = 
conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
++          final long bufferSize = 
conf.getAsBytes(Property.TSERV_SORT_BUFFER_SIZE);
+           Thread.currentThread().setName("Sorting " + name + " for recovery");
+           while (true) {
+             final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new 
ArrayList<>();
+             try {
+               long start = input.getPos();
+               while (input.getPos() - start < bufferSize) {
+                 LogFileKey key = new LogFileKey();
+                 LogFileValue value = new LogFileValue();
+                 key.readFields(decryptingInput);
+                 value.readFields(decryptingInput);
+                 buffer.add(new Pair<>(key, value));
+               }
+               writeBuffer(destPath, buffer, part++);
+               buffer.clear();
+             } catch (EOFException ex) {
+               writeBuffer(destPath, buffer, part++);
+               break;
              }
-             writeBuffer(destPath, buffer, part++);
-             buffer.clear();
-           } catch (EOFException ex) {
-             writeBuffer(destPath, buffer, part++);
-             break;
            }
+           fs.create(new Path(destPath, "finished")).close();
 -          log.info("Finished log sort " + name + " " + getBytesCopied() + " 
bytes " + part + " parts in " + getSortTime() + "ms");
++          log.info("Finished log sort {} {} bytes {} parts in {}ms", name, 
getBytesCopied(), part, getSortTime());
          }
-         fs.create(new Path(destPath, "finished")).close();
-         log.info("Finished log sort {} {} bytes {} parts in {}ms", name, 
getBytesCopied(), part, getSortTime());
        } catch (Throwable t) {
          try {
            // parent dir may not exist
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
index 479550f,fb286c4..76056b4
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
@@@ -101,28 -101,30 +102,30 @@@ public class LogReader 
        LogFileValue value = new LogFileValue();
  
        if (fs.isFile(path)) {
-         // read log entries from a simple hdfs file
-         DFSLoggerInputStreams streams;
-         try {
-           streams = DfsLogger.readHeaderAndReturnStream(fs, path, 
SiteConfiguration.getInstance());
-         } catch (LogHeaderIncompleteException e) {
-           log.warn("Could not read header for {}. Ignoring...", path);
-           continue;
-         }
-         DataInputStream input = streams.getDecryptingInputStream();
+         try (final FSDataInputStream fsinput = fs.open(path)) {
+           // read log entries from a simple hdfs file
+           DFSLoggerInputStreams streams;
+           try {
+             streams = DfsLogger.readHeaderAndReturnStream(fsinput, 
SiteConfiguration.getInstance());
+           } catch (LogHeaderIncompleteException e) {
 -            log.warn("Could not read header for " + path + ". Ignoring...");
++            log.warn("Could not read header for {} . Ignoring...", path);
+             continue;
+           }
+           DataInputStream input = streams.getDecryptingInputStream();
  
-         try {
-           while (true) {
-             try {
-               key.readFields(input);
-               value.readFields(input);
-             } catch (EOFException ex) {
-               break;
+           try {
+             while (true) {
+               try {
+                 key.readFields(input);
+                 value.readFields(input);
+               } catch (EOFException ex) {
+                 break;
+               }
+               printLogEvent(key, value, row, rowMatcher, ke, tabletIds, 
opts.maxMutations);
              }
-             printLogEvent(key, value, row, rowMatcher, ke, tabletIds, 
opts.maxMutations);
+           } finally {
+             input.close();
            }
-         } finally {
-           input.close();
          }
        } else {
          // read the log entries sorted in a map file

-- 
To stop receiving notification emails like this one, please contact
adamjsh...@apache.org.

Reply via email to