ChinmaySKulkarni commented on a change in pull request #746: PHOENIX-5802: 
Connection leaks in UPSERT SELECT/DELETE paths due to 
MutatingParallelIteratorFactory iterator not being closed
URL: https://github.com/apache/phoenix/pull/746#discussion_r399593781
 
 

 ##########
 File path: 
phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
 ##########
 @@ -50,62 +51,80 @@ protected 
MutatingParallelIteratorFactory(PhoenixConnection connection) {
     /**
      * Method that does the actual mutation work
      */
-    abstract protected MutationState mutate(StatementContext parentContext, 
ResultIterator iterator, PhoenixConnection connection) throws SQLException;
+    abstract protected MutationState mutate(StatementContext parentContext, 
ResultIterator iterator,
+            PhoenixConnection connection) throws SQLException;
     
     @Override
-    public PeekingResultIterator newIterator(final StatementContext 
parentContext, ResultIterator iterator, Scan scan, String tableName, QueryPlan 
plan) throws SQLException {
+    public PeekingResultIterator newIterator(final StatementContext 
parentContext,
+            ResultIterator iterator, Scan scan, String tableName,
+            QueryPlan plan) throws SQLException {
+
         final PhoenixConnection clonedConnection = new 
PhoenixConnection(this.connection);
-        
-        MutationState state = mutate(parentContext, iterator, 
clonedConnection);
-        
-        final long totalRowCount = state.getUpdateCount();
-        final boolean autoFlush = connection.getAutoCommit() || 
plan.getTableRef().getTable().isTransactional();
-        if (autoFlush) {
-            clonedConnection.getMutationState().join(state);
-            state = clonedConnection.getMutationState();
-        }
-        final MutationState finalState = state;
-        
-        byte[] value = PLong.INSTANCE.toBytes(totalRowCount);
-        KeyValue keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
-        final Tuple tuple = new SingleKeyValueTuple(keyValue);
-        return new PeekingResultIterator() {
-            private boolean done = false;
-            
-            @Override
-            public Tuple next() throws SQLException {
-                if (done) {
-                    return null;
-                }
-                done = true;
-                return tuple;
-            }
+        try {
+            MutationState state = mutate(parentContext, iterator, 
clonedConnection);
 
-            @Override
-            public void explain(List<String> planSteps) {
+            final long totalRowCount = state.getUpdateCount();
+            final boolean autoFlush = connection.getAutoCommit() ||
+                    plan.getTableRef().getTable().isTransactional();
+            if (autoFlush) {
+                clonedConnection.getMutationState().join(state);
+                state = clonedConnection.getMutationState();
             }
+            final MutationState finalState = state;
+
+            byte[] value = PLong.INSTANCE.toBytes(totalRowCount);
+            KeyValue keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY,
+                    SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 
0, value.length);
+            final Tuple tuple = new SingleKeyValueTuple(keyValue);
+            return new PeekingResultIterator() {
+                private boolean done = false;
 
-            @Override
-            public void close() throws SQLException {
-                try {
-                    /* 
-                     * Join the child mutation states in close, since this is 
called in a single threaded manner
-                     * after the parallel results have been processed. 
-                     * If auto-commit is on for the cloned child connection, 
then the finalState here is an empty mutation 
-                     * state (with no mutations). However, it still has the 
metrics for mutation work done by the 
-                     * mutating-iterator. Joining the mutation state makes 
sure those metrics are passed over
-                     * to the parent connection.
-                     */ 
-                    
MutatingParallelIteratorFactory.this.connection.getMutationState().join(finalState);
-                } finally {
-                    clonedConnection.close();
+                @Override
+                public Tuple next() {
+                    if (done) {
+                        return null;
+                    }
+                    done = true;
+                    return tuple;
                 }
-            }
 
-            @Override
-            public Tuple peek() throws SQLException {
-                return done ? null : tuple;
+                @Override
+                public void explain(List<String> planSteps) {
+                }
+
+                @Override
+                public void close() throws SQLException {
+                    try {
+                        /*
+                         * Join the child mutation states in close, since this 
is called in a single
+                         * threaded manner after the parallel results have 
been processed.
+                         * If auto-commit is on for the cloned child 
connection, then the finalState
+                         * here is an empty mutation state (with no 
mutations). However, it still
+                         * has the metrics for mutation work done by the 
mutating-iterator.
+                         * Joining the mutation state makes sure those metrics 
are passed over
+                         * to the parent connection.
+                         */
+                        
MutatingParallelIteratorFactory.this.connection.getMutationState()
+                                .join(finalState);
+                    } finally {
+                        clonedConnection.close();
+                    }
+                }
+
+                @Override
+                public Tuple peek() {
+                    return done ? null : tuple;
+                }
+            };
+        } catch (Throwable ex) {
+            // Catch just to make sure we close the cloned connection and then 
rethrow
+            try {
+                // closeQuietly only handles IOException
+                clonedConnection.close();
+            } catch (SQLException ignore) {
+                // ignore
 
 Review comment:
   Will add 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to