Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 5b9eff836 -> 05b2b8987
MLHR-1912 #comment fixed style violations Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/2fe4bec5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/2fe4bec5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/2fe4bec5 Branch: refs/heads/devel-3 Commit: 2fe4bec5343032863c204a3d90c79ec176684018 Parents: 5b9eff8 Author: Timothy Farkas <[email protected]> Authored: Thu Nov 19 01:14:23 2015 -0800 Committer: Chandni Singh <[email protected]> Committed: Thu Nov 19 11:11:42 2015 -0800 ---------------------------------------------------------------------- .../lib/db/jdbc/JdbcTransactionalStore.java | 12 +++--- .../lib/io/fs/AbstractFileOutputOperator.java | 45 +++++++++----------- 2 files changed, 24 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2fe4bec5/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java index 1d0f720..e4a7229 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java @@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory; */ public class JdbcTransactionalStore extends JdbcStore implements TransactionableStore { - private static transient final Logger LOG = LoggerFactory.getLogger(JdbcTransactionalStore.class); + private static final transient Logger LOG = LoggerFactory.getLogger(JdbcTransactionalStore.class); public static String DEFAULT_APP_ID_COL = "dt_app_id"; public static String DEFAULT_OPERATOR_ID_COL = "dt_operator_id"; @@ -198,7 +198,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable Long lastWindow = getCommittedWindowIdHelper(appId, operatorId); try { - if(lastWindow == null) { + if (lastWindow == null) { lastWindowInsertCommand.close(); connection.commit(); } @@ -206,14 +206,12 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable lastWindowFetchCommand.close(); LOG.debug("Last window id: {}", lastWindow); - if(lastWindow == null) { + if (lastWindow == null) { return -1L; - } - else { + } else { return lastWindow; } - } - catch (SQLException ex) { + } catch (SQLException ex) { throw new RuntimeException(ex); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2fe4bec5/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java index fcbe1e8..2aa658f 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java @@ -126,7 +126,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp /** * The default number of max open files. */ - public final static int DEFAULT_MAX_OPEN_FILES = 100; + public static final int DEFAULT_MAX_OPEN_FILES = 100; /** * Keyname to rolling file number. @@ -310,9 +310,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp { FileSystem tempFS = FileSystem.newInstance(new Path(filePath).toUri(), new Configuration()); - if(tempFS instanceof LocalFileSystem) - { - tempFS = ((LocalFileSystem) tempFS).getRaw(); + if (tempFS instanceof LocalFileSystem) { + tempFS = ((LocalFileSystem)tempFS).getRaw(); } return tempFS; @@ -331,8 +330,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp //Getting required file system instance. try { fs = getFSInstance(); - } - catch (IOException ex) { + } catch (IOException ex) { throw new RuntimeException(ex); } @@ -375,7 +373,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp if (rollingFile) { //delete the left over future rolling files produced from the previous crashed instance of this operator. - for(String seenFileName: endOffsets.keySet()) { + for (String seenFileName : endOffsets.keySet()) { try { Integer fileOpenPart = this.openPart.get(seenFileName).getValue(); int nextPart = fileOpenPart + 1; @@ -723,12 +721,11 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp closeStream(fsFilterStreamContext); filesWithOpenStreams.remove(seenFileName); totalWritingTime += System.currentTimeMillis() - start; - } - catch (IOException ex) { + } catch (IOException ex) { //Count number of failures numberOfFailures++; //Add names of first N failed files to list - if(fileNames.size() < MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION) { + if (fileNames.size() < MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION) { fileNames.add(seenFileName); //save exception savedException = ex; @@ -741,8 +738,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp try { fs.close(); - } - catch (IOException ex) { + } catch (IOException ex) { //Closing file system failed savedException = ex; fsFailed = true; @@ -753,20 +749,20 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp String errorMessage = ""; //File system failed to close - if(fsFailed) { + if (fsFailed) { errorMessage += "Closing the fileSystem failed. "; } //Print names of atmost first N files that failed to close - if(!fileNames.isEmpty()) { + if (!fileNames.isEmpty()) { errorMessage += "The following files failed closing: "; } - for(String seenFileName: fileNames) { + for (String seenFileName: fileNames) { errorMessage += seenFileName + ", "; } - if(numberOfFailures > MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION) { + if (numberOfFailures > MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION) { errorMessage += (numberOfFailures - MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION) + " more files failed."; } @@ -801,7 +797,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp totalBytesWritten += tupleBytes.length; MutableLong currentOffset = endOffsets.get(fileName); - if(currentOffset == null) { + if (currentOffset == null) { currentOffset = new MutableLong(0); endOffsets.put(fileName, currentOffset); } @@ -881,11 +877,10 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp */ protected void flush(FSDataOutputStream fsOutput) throws IOException { - if(fs instanceof LocalFileSystem || - fs instanceof RawLocalFileSystem) { + if (fs instanceof LocalFileSystem || + fs instanceof RawLocalFileSystem) { fsOutput.flush(); - } - else { + } else { fsOutput.hflush(); } } @@ -920,8 +915,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp * @param part The part number of the rolling file. * @return The rolling file name. */ - protected String getPartFileName(String fileName, - int part) + protected String getPartFileName(String fileName, int part) { return fileName + "." + part; } @@ -940,7 +934,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp currentWindow = windowId; } - @Override + @Override public void endWindow() { try { @@ -951,8 +945,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp totalWritingTime += System.currentTimeMillis() - start; //streamContext.resetFilter(); } - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException(e); }
