NIFI-396 added a DisableOnCloseInputStream class; modified StandardProcessSession to prevent access of the Input/OutputStreams after callbacks have been executed; updated tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/e2760f8c Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/e2760f8c Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/e2760f8c Branch: refs/heads/NIFI-250 Commit: e2760f8c980583d285137134e05c435c930fb4d2 Parents: 7272d0d Author: Bobby Owolabi <[email protected]> Authored: Thu Mar 19 00:54:24 2015 -0400 Committer: Bobby Owolabi <[email protected]> Committed: Thu Mar 19 00:54:24 2015 -0400 ---------------------------------------------------------------------- .../repository/StandardProcessSession.java | 11 +- .../io/DisableOnCloseInputStream.java | 93 ++++++ .../repository/TestStandardProcessSession.java | 295 +++++++++++-------- 3 files changed, 266 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e2760f8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 8d2e456..e5cd03e 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -46,6 +46,7 @@ import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.io.ByteCountingInputStream; import org.apache.nifi.controller.repository.io.ByteCountingOutputStream; +import org.apache.nifi.controller.repository.io.DisableOnCloseInputStream; import org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream; import org.apache.nifi.controller.repository.io.FlowFileAccessInputStream; import org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream; @@ -1735,7 +1736,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset()); final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); - final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn, this.bytesRead)) { + final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); + final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) { // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository @@ -2180,9 +2182,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE try (final InputStream rawIn = getInputStream(source, currClaim, record.getCurrentClaimOffset()); final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); - final InputStream countingIn = new ByteCountingInputStream(limitedIn, bytesRead); - final OutputStream disableOnClose = new DisableOnCloseOutputStream(currentWriteClaimStream); - final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) { + final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); + final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead); + final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(currentWriteClaimStream); + final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) { recursionSet.add(source); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e2760f8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/DisableOnCloseInputStream.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/DisableOnCloseInputStream.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/DisableOnCloseInputStream.java new file mode 100644 index 0000000..ddcf6c9 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/DisableOnCloseInputStream.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository.io; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Wraps an existing InputStream, so that when {@link InputStream#close()} is + * called, the underlying InputStream is NOT closed but this InputStream can no + * longer be written to + */ +public class DisableOnCloseInputStream extends InputStream { + + private final InputStream wrapped; + private boolean closed = false; + + public DisableOnCloseInputStream(final InputStream wrapped) { + this.wrapped = wrapped; + } + + @Override + public int read() throws IOException { + checkClosed(); + return wrapped.read(); + } + + @Override + public int read(byte[] b) throws IOException { + checkClosed(); + return wrapped.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + checkClosed(); + return wrapped.read(b, off, len); + } + + @Override + public long skip(long n) throws IOException { + checkClosed(); + return wrapped.skip(n); + } + + @Override + public int available() throws IOException { + return wrapped.available(); + } + + private void checkClosed() throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } + } + + @Override + public void close() throws IOException { + closed = true; + } + + @Override + public void mark(int readlimit) { + if (closed == false) { + wrapped.mark(readlimit); + } + } + + @Override + public synchronized void reset() throws IOException { + checkClosed(); + wrapped.reset(); + } + + @Override + public boolean markSupported() { + return wrapped.markSupported(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e2760f8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 2d09ea5..ef2fb93 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -66,7 +66,6 @@ import org.apache.nifi.provenance.MockProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; -import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.util.ObjectHolder; import org.junit.After; import org.junit.Assert; @@ -238,7 +237,61 @@ public class TestStandardProcessSession { assertEquals(0, contentRepo.getExistingClaims().size()); } - @Test(expected = FlowFileAccessException.class) + private void assertDisabled(final OutputStream outputStream) { + try { + outputStream.write(new byte[0]); + Assert.fail("Expected OutputStream to be disabled; was able to call write(byte[])"); + } catch (final Exception ex) { + Assert.assertEquals(FlowFileAccessException.class, ex.getClass()); + } + try { + outputStream.write(0); + Assert.fail("Expected OutputStream to be disabled; was able to call write(int)"); + } catch (final Exception ex) { + Assert.assertEquals(FlowFileAccessException.class, ex.getClass()); + } + try { + outputStream.write(new byte[0], 0, 0); + Assert.fail("Expected OutputStream to be disabled; was able to call write(byte[], int, int)"); + } catch (final Exception ex) { + Assert.assertEquals(FlowFileAccessException.class, ex.getClass()); + } + } + + private void assertDisabled(final InputStream inputStream) { + try { + inputStream.read(); + Assert.fail("Expected InputStream to be disabled; was able to call read()"); + } catch (final Exception ex) { + Assert.assertEquals(FlowFileAccessException.class, ex.getClass()); + } + try { + inputStream.read(new byte[0]); + Assert.fail("Expected InputStream to be disabled; was able to call read(byte[])"); + } catch (final Exception ex) { + Assert.assertEquals(FlowFileAccessException.class, ex.getClass()); + } + try { + inputStream.read(new byte[0], 0, 0); + Assert.fail("Expected InputStream to be disabled; was able to call read(byte[], int, int)"); + } catch (final Exception ex) { + Assert.assertEquals(FlowFileAccessException.class, ex.getClass()); + } + try { + inputStream.reset(); + Assert.fail("Expected InputStream to be disabled; was able to call reset()"); + } catch (final Exception ex) { + Assert.assertEquals(FlowFileAccessException.class, ex.getClass()); + } + try { + inputStream.skip(1L); + Assert.fail("Expected InputStream to be disabled; was able to call skip(long)"); + } catch (final Exception ex) { + Assert.assertEquals(FlowFileAccessException.class, ex.getClass()); + } + } + + @Test public void testAppendAfterSessionClosesStream() throws IOException { final ContentClaim claim = contentRepo.create(false); final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() @@ -256,12 +309,10 @@ public class TestStandardProcessSession { outputStreamHolder.set(outputStream); } }); - try (final OutputStream outputStream = outputStreamHolder.get()) { - outputStream.write(5); - } + assertDisabled(outputStreamHolder.get()); } - @Test(expected = FlowFileAccessException.class) + @Test public void testReadAfterSessionClosesStream() throws IOException { final ContentClaim claim = contentRepo.create(false); final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() @@ -279,9 +330,7 @@ public class TestStandardProcessSession { inputStreamHolder.set(inputStream); } }); - try (final InputStream inputStream = inputStreamHolder.get()) { - inputStream.read(); - } + assertDisabled(inputStreamHolder.get()); } @Test @@ -304,17 +353,11 @@ public class TestStandardProcessSession { outputStreamHolder.set(output); } }); - try (final InputStream inputStream = inputStreamHolder.get()) { - inputStream.read(); - Assert.fail("Expected Exception to be thrown when read is attempted after session closes stream"); - } catch (final Exception ex) {} - try (final OutputStream outputStream = outputStreamHolder.get()) { - outputStream.write(5); - Assert.fail("Expected Exception to be thrown when write is attempted after session closes stream"); - } catch (final Exception ex) {} - } + assertDisabled(inputStreamHolder.get()); + assertDisabled(outputStreamHolder.get()); + } - @Test(expected = FlowFileAccessException.class) + @Test public void testWriteAfterSessionClosesStream() throws IOException { final ContentClaim claim = contentRepo.create(false); final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() @@ -332,9 +375,7 @@ public class TestStandardProcessSession { outputStreamHolder.set(out); } }); - try (final OutputStream outputStream = outputStreamHolder.get()) { - outputStream.write(5); - } + assertDisabled(outputStreamHolder.get()); } @Test @@ -385,7 +426,6 @@ public class TestStandardProcessSession { assertEquals(0, provenanceRepo.getEvents(0L, 100000).size()); } - @Test public void testProvenanceEventsEmittedForForkIfNotRemoved() throws IOException { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() @@ -425,59 +465,59 @@ public class TestStandardProcessSession { @Test public void testUpdateAttributesThenJoin() throws IOException { final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder() - .id(1L) - .addAttribute("uuid", "11111111-1111-1111-1111-111111111111") - .entryDate(System.currentTimeMillis()) - .build(); - + .id(1L) + .addAttribute("uuid", "11111111-1111-1111-1111-111111111111") + .entryDate(System.currentTimeMillis()) + .build(); + final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() - .id(2L) - .addAttribute("uuid", "22222222-2222-2222-2222-222222222222") - .entryDate(System.currentTimeMillis()) - .build(); - + .id(2L) + .addAttribute("uuid", "22222222-2222-2222-2222-222222222222") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord1); flowFileQueue.put(flowFileRecord2); - + FlowFile ff1 = session.get(); FlowFile ff2 = session.get(); ff1 = session.putAttribute(ff1, "index", "1"); ff2 = session.putAttribute(ff2, "index", "2"); - + final List<FlowFile> parents = new ArrayList<>(2); parents.add(ff1); parents.add(ff2); - + final FlowFile child = session.create(parents); - + final Relationship rel = new Relationship.Builder().name("A").build(); - + session.transfer(ff1, rel); session.transfer(ff2, rel); session.transfer(child, rel); - + session.commit(); - + final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 1000); // We should have a JOIN and 2 ATTRIBUTE_MODIFIED's assertEquals(3, events.size()); - + int joinCount = 0; int ff1UpdateCount = 0; int ff2UpdateCount = 0; - - for ( final ProvenanceEventRecord event : events ) { + + for (final ProvenanceEventRecord event : events) { switch (event.getEventType()) { case JOIN: assertEquals(child.getAttribute("uuid"), event.getFlowFileUuid()); joinCount++; break; case ATTRIBUTES_MODIFIED: - if ( event.getFlowFileUuid().equals(ff1.getAttribute("uuid")) ) { + if (event.getFlowFileUuid().equals(ff1.getAttribute("uuid"))) { ff1UpdateCount++; - } else if ( event.getFlowFileUuid().equals(ff2.getAttribute("uuid")) ) { + } else if (event.getFlowFileUuid().equals(ff2.getAttribute("uuid"))) { ff2UpdateCount++; } else { Assert.fail("Got ATTRIBUTE_MODIFIED for wrong FlowFile: " + event.getFlowFileUuid()); @@ -487,14 +527,14 @@ public class TestStandardProcessSession { Assert.fail("Unexpected event type: " + event); } } - + assertEquals(1, joinCount); assertEquals(1, ff1UpdateCount); assertEquals(1, ff2UpdateCount); - + assertEquals(1, joinCount); } - + @Test public void testForkOneToOneReported() throws IOException { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() @@ -804,34 +844,34 @@ public class TestStandardProcessSession { @Test public void testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .contentClaim(new ContentClaim() { - @Override - public int compareTo(ContentClaim arg0) { - return 0; - } - - @Override - public String getId() { - return "0"; - } - - @Override - public String getContainer() { - return "container"; - } - - @Override - public String getSection() { - return "section"; - } - - @Override - public boolean isLossTolerant() { - return true; - } - }).build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(new ContentClaim() { + @Override + public int compareTo(ContentClaim arg0) { + return 0; + } + + @Override + public String getId() { + return "0"; + } + + @Override + public String getContainer() { + return "container"; + } + + @Override + public String getSection() { + return "section"; + } + + @Override + public boolean isLossTolerant() { + return true; + } + }).build(); flowFileQueue.put(flowFileRecord); FlowFile ff1 = session.get(); @@ -844,35 +884,35 @@ public class TestStandardProcessSession { session.commit(); final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .contentClaim(new ContentClaim() { - @Override - public int compareTo(ContentClaim arg0) { - return 0; - } - - @Override - public String getId() { - return "0"; - } - - @Override - public String getContainer() { - return "container"; - } - - @Override - public String getSection() { - return "section"; - } - - @Override - public boolean isLossTolerant() { - return true; - } - }) - .contentClaimOffset(1000L).size(1L).build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(new ContentClaim() { + @Override + public int compareTo(ContentClaim arg0) { + return 0; + } + + @Override + public String getId() { + return "0"; + } + + @Override + public String getContainer() { + return "container"; + } + + @Override + public String getSection() { + return "section"; + } + + @Override + public boolean isLossTolerant() { + return true; + } + }) + .contentClaimOffset(1000L).size(1L).build(); flowFileQueue.put(flowFileRecord2); // attempt to read the data. @@ -933,21 +973,20 @@ public class TestStandardProcessSession { } } - @Test public void testCreateEmitted() throws IOException { FlowFile newFlowFile = session.create(); session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); session.commit(); - + final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000); assertFalse(events.isEmpty()); assertEquals(1, events.size()); - + final ProvenanceEventRecord event = events.get(0); assertEquals(ProvenanceEventType.CREATE, event.getEventType()); } - + @Test public void testContentModifiedNotEmittedForCreate() throws IOException { FlowFile newFlowFile = session.create(); @@ -958,23 +997,23 @@ public class TestStandardProcessSession { }); session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); session.commit(); - + final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000); assertFalse(events.isEmpty()); assertEquals(1, events.size()); - + final ProvenanceEventRecord event = events.get(0); assertEquals(ProvenanceEventType.CREATE, event.getEventType()); } - + @Test public void testContentModifiedEmittedAndNotAttributesModified() throws IOException { final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder() - .id(1L) - .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") - .build(); + .id(1L) + .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") + .build(); this.flowFileQueue.put(flowFile); - + FlowFile existingFlowFile = session.get(); existingFlowFile = session.write(existingFlowFile, new OutputStreamCallback() { @Override @@ -984,38 +1023,36 @@ public class TestStandardProcessSession { existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a"); session.transfer(existingFlowFile, new Relationship.Builder().name("A").build()); session.commit(); - + final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000); assertFalse(events.isEmpty()); assertEquals(1, events.size()); - + final ProvenanceEventRecord event = events.get(0); assertEquals(ProvenanceEventType.CONTENT_MODIFIED, event.getEventType()); } - + @Test public void testAttributesModifiedEmitted() throws IOException { final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder() - .id(1L) - .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") - .build(); + .id(1L) + .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") + .build(); this.flowFileQueue.put(flowFile); - + FlowFile existingFlowFile = session.get(); existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a"); session.transfer(existingFlowFile, new Relationship.Builder().name("A").build()); session.commit(); - + final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000); assertFalse(events.isEmpty()); assertEquals(1, events.size()); - + final ProvenanceEventRecord event = events.get(0); assertEquals(ProvenanceEventType.ATTRIBUTES_MODIFIED, event.getEventType()); } - - - + private static class MockFlowFileRepository implements FlowFileRepository { private final AtomicLong idGenerator = new AtomicLong(0L); @@ -1082,7 +1119,7 @@ public class TestStandardProcessSession { @Override public void shutdown() { } - + public Set<ContentClaim> getExistingClaims() { final Set<ContentClaim> claims = new HashSet<>();
