This is an automated email from the ASF dual-hosted git repository.

ymdavis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new c604923  NIFI-6947: Add PutRecord processor using RecordSinkService 
(#3943)
c604923 is described below

commit c604923c0b20533299cbc2fa7e88dbfc0d790f80
Author: Matthew Burgess <[email protected]>
AuthorDate: Wed Jan 8 03:25:14 2020 -0500

    NIFI-6947: Add PutRecord processor using RecordSinkService (#3943)
    
    * NIFI-6947: Add PutRecord processor using RecordSinkService
    
    * NIFI-6947: Incorporated review comments
---
 .../nifi-standard-processors/pom.xml               |   4 +
 .../apache/nifi/processors/standard/PutRecord.java | 192 +++++++++++++++++++++
 .../services/org.apache.nifi.processor.Processor   |   1 +
 .../processors/standard/MockRecordSinkService.java |  92 ++++++++++
 .../nifi/processors/standard/TestPutRecord.java    | 150 ++++++++++++++++
 .../nifi/record/sink/RetryableIOException.java     |  41 +++++
 6 files changed, 480 insertions(+)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 72cdb65..9c77464 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -82,6 +82,10 @@
             <artifactId>nifi-record</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-sink-api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutRecord.java
new file mode 100644
index 0000000..180d44b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutRecord.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.record.sink.RetryableIOException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@EventDriven
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"record", "put", "sink"})
+@CapabilityDescription("The PutRecord processor uses a specified RecordReader 
to input (possibly multiple) records from an incoming flow file, and sends them 
"
+        + "to a destination specified by a Record Destination Service (i.e. 
record sink).")
+public class PutRecord extends AbstractProcessor {
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("put-record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for reading 
incoming data")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor RECORD_SINK = new 
PropertyDescriptor.Builder()
+            .name("put-record-sink")
+            .displayName("Record Destination Service")
+            .description("Specifies the Controller Service to use for writing 
out the query result records to some destination.")
+            .identifiesControllerService(RecordSinkService.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_ZERO_RECORD_RESULTS = new 
PropertyDescriptor.Builder()
+            .name("put-record-include-zero-record-results")
+            .displayName("Include Zero Record Results")
+            .description("If no records are read from the incoming FlowFile, 
this property specifies whether or not an empty record set will be transmitted. 
The original "
+                    + "FlowFile will still be routed to success, but if no 
transmission occurs, no provenance SEND event will be generated.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    // Relationships
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The original FlowFile will be routed to this 
relationship if the records were transmitted successfully")
+            .build();
+
+    static final Relationship REL_RETRY = new Relationship.Builder()
+            .name("retry")
+            .description("The original FlowFile is routed to this relationship 
if the records could not be transmitted but attempting the operation again may 
succeed")
+            .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if the 
records could not be transmitted and retrying the operation will also fail")
+            .build();
+
+    private static final List<PropertyDescriptor> properties;
+    private static final Set<Relationship> relationships;
+
+    private volatile RecordSinkService recordSinkService;
+
+    static {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(RECORD_READER);
+        props.add(RECORD_SINK);
+        props.add(INCLUDE_ZERO_RECORD_RESULTS);
+        properties = Collections.unmodifiableList(props);
+
+        final Set<Relationship> r = new HashSet<>();
+        r.add(REL_SUCCESS);
+        r.add(REL_FAILURE);
+        r.add(REL_RETRY);
+        relationships = Collections.unmodifiableSet(r);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        recordSinkService = 
context.getProperty(RECORD_SINK).asControllerService(RecordSinkService.class);
+        recordSinkService.reset();
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        final StopWatch stopWatch = new StopWatch(true);
+
+        RecordSet recordSet;
+        try (final InputStream in = session.read(flowFile)) {
+
+            final RecordReaderFactory recordParserFactory = 
context.getProperty(RECORD_READER)
+                    .asControllerService(RecordReaderFactory.class);
+            final RecordReader recordParser = 
recordParserFactory.createRecordReader(flowFile, in, getLogger());
+            recordSet = recordParser.createRecordSet();
+
+            final boolean transmitZeroRecords = 
context.getProperty(INCLUDE_ZERO_RECORD_RESULTS).asBoolean();
+            final WriteResult writeResult = 
recordSinkService.sendData(recordSet, new HashMap<>(flowFile.getAttributes()), 
transmitZeroRecords);
+            String recordSinkURL = 
writeResult.getAttributes().get("record.sink.url");
+            if (StringUtils.isEmpty(recordSinkURL)) {
+                recordSinkURL = "unknown://";
+            }
+
+            final long transmissionMillis = 
stopWatch.getElapsed(TimeUnit.MILLISECONDS);
+            // Only record provenance if we sent any records
+            if (writeResult.getRecordCount() > 0 || transmitZeroRecords) {
+                session.getProvenanceReporter().send(flowFile, recordSinkURL, 
transmissionMillis);
+            }
+
+        } catch (RetryableIOException rioe) {
+            getLogger().warn("Error during transmission of records due to {}, 
routing to retry", new Object[]{rioe.getMessage()}, rioe);
+            session.transfer(flowFile, REL_RETRY);
+            return;
+        } catch (SchemaNotFoundException snfe) {
+            throw new ProcessException("Error determining schema of flowfile 
records: " + snfe.getMessage(), snfe);
+        } catch (MalformedRecordException e) {
+            getLogger().error("Error reading records from {} due to {}, 
routing to failure", new Object[]{flowFile, e.getMessage()}, e);
+            session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        } catch (IOException ioe) {
+            // The cause might be a MalformedRecordException (RecordReader 
wraps it in an IOException), send to failure in that case
+            if (ioe.getCause() instanceof MalformedRecordException) {
+                getLogger().error("Error reading records from {} due to {}, 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
+                session.penalize(flowFile);
+                session.transfer(flowFile, REL_FAILURE);
+                return;
+            }
+            throw new ProcessException("Error reading from flowfile input 
stream: " + ioe.getMessage(), ioe);
+        } catch (Exception e) {
+            getLogger().error("Error during transmission of records due to {}, 
routing to failure", new Object[]{e.getMessage()}, e);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 37bf6c2..f2c4f64 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -91,6 +91,7 @@ org.apache.nifi.processors.standard.PutEmail
 org.apache.nifi.processors.standard.PutFile
 org.apache.nifi.processors.standard.PutFTP
 org.apache.nifi.processors.standard.PutJMS
+org.apache.nifi.processors.standard.PutRecord
 org.apache.nifi.processors.standard.PutSFTP
 org.apache.nifi.processors.standard.PutSQL
 org.apache.nifi.processors.standard.PutSyslog
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/MockRecordSinkService.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/MockRecordSinkService.java
new file mode 100644
index 0000000..13d86ee
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/MockRecordSinkService.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.components.AbstractConfigurableComponent;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.record.sink.RetryableIOException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MockRecordSinkService extends AbstractConfigurableComponent 
implements RecordSinkService {
+
+    private List<Map<String, Object>> rows = new ArrayList<>();
+    private boolean transmitted = false;
+    private boolean failWithRetryableError = false;
+
+    public MockRecordSinkService() {
+    }
+
+    public MockRecordSinkService(boolean failWithRetryableError) {
+        this();
+        this.failWithRetryableError = failWithRetryableError;
+    }
+
+    @Override
+    public WriteResult sendData(RecordSet recordSet, Map<String, String> 
attributes, boolean sendZeroResults) throws IOException {
+        if (failWithRetryableError) {
+            throw new RetryableIOException("Retryable");
+        }
+        int numRecordsWritten = 0;
+        RecordSchema recordSchema = recordSet.getSchema();
+        Record record;
+        while ((record = recordSet.next()) != null) {
+            Map<String, Object> row = new HashMap<>();
+            final Record finalRecord = record;
+            recordSchema.getFieldNames().forEach((fieldName) -> 
row.put(fieldName, finalRecord.getValue(fieldName)));
+            rows.add(row);
+            numRecordsWritten++;
+        }
+
+        if (numRecordsWritten > 0 || sendZeroResults) {
+            transmitted = true;
+        }
+        return WriteResult.of(numRecordsWritten, Collections.emptyMap());
+    }
+
+    @Override
+    public String getIdentifier() {
+        return "MockRecordSinkService";
+    }
+
+    @Override
+    public void initialize(ControllerServiceInitializationContext context) 
throws InitializationException {
+    }
+
+    public List<Map<String, Object>> getRows() {
+        return rows;
+    }
+
+    public boolean isTransmitted() {
+        return transmitted;
+    }
+
+    public void setFailWithRetryableError(boolean failWithRetryableError) {
+        this.failWithRetryableError = failWithRetryableError;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutRecord.java
new file mode 100644
index 0000000..3a3c839
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutRecord.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestPutRecord {
+
+    private TestRunner testRunner;
+    private MockRecordParser recordReader;
+    private MockRecordSinkService mockRecordSinkService;
+
+    @Before
+    public void setup() {
+        PutRecord processor = new PutRecord();
+        testRunner = TestRunners.newTestRunner(processor);
+        recordReader = new MockRecordParser();
+        testRunner.setProperty(PutRecord.RECORD_READER, "reader");
+        mockRecordSinkService = new MockRecordSinkService();
+        testRunner.setProperty(PutRecord.RECORD_SINK, "MockRecordSinkService");
+    }
+
+    @Test
+    public void testSimplePut() throws Exception {
+        testRunner.addControllerService("reader", recordReader);
+        testRunner.enableControllerService(recordReader);
+
+        testRunner.addControllerService("MockRecordSinkService", 
mockRecordSinkService);
+        testRunner.enableControllerService(mockRecordSinkService);
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+        recordReader.addRecord("John Doe", 48, "Soccer");
+        recordReader.addRecord("Jane Doe", 47, "Tennis");
+        recordReader.addRecord("Sally Doe", 47, "Curling");
+        recordReader.addRecord("Jimmy Doe", 14, null);
+        recordReader.addRecord("Pizza Doe", 14, null);
+
+        testRunner.enqueue("");
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(PutRecord.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testNoRows() throws Exception {
+        testRunner.addControllerService("reader", recordReader);
+        testRunner.enableControllerService(recordReader);
+
+        testRunner.addControllerService("MockRecordSinkService", 
mockRecordSinkService);
+        testRunner.enableControllerService(mockRecordSinkService);
+
+        testRunner.setProperty(PutRecord.INCLUDE_ZERO_RECORD_RESULTS, "false");
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+        testRunner.enqueue("");
+        testRunner.run();
+
+        assertTrue(mockRecordSinkService.getRows().isEmpty());
+        assertFalse(mockRecordSinkService.isTransmitted());
+        // Original flow file is still transferred
+        testRunner.assertAllFlowFilesTransferred(PutRecord.REL_SUCCESS, 1);
+        testRunner.clearTransferState();
+
+        // Send an empty record set anyway
+        testRunner.setProperty(PutRecord.INCLUDE_ZERO_RECORD_RESULTS, "true");
+        testRunner.enqueue("");
+        testRunner.run();
+
+        assertTrue(mockRecordSinkService.getRows().isEmpty());
+        assertTrue(mockRecordSinkService.isTransmitted());
+        // Original flow file is still transferred
+        testRunner.assertAllFlowFilesTransferred(PutRecord.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testBadRecords() throws Exception {
+        recordReader = new MockRecordParser(1);
+        testRunner.addControllerService("reader", recordReader);
+        testRunner.enableControllerService(recordReader);
+
+        testRunner.addControllerService("MockRecordSinkService", 
mockRecordSinkService);
+        testRunner.enableControllerService(mockRecordSinkService);
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+        recordReader.addRecord("John Doe", 48, "Soccer");
+        recordReader.addRecord("Jane Doe", 47, "Tennis");
+        recordReader.addRecord("Sally Doe", 47, "Curling");
+        recordReader.addRecord("Jimmy Doe", 14, null);
+        recordReader.addRecord("Pizza Doe", 14, null);
+
+        testRunner.enqueue("");
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(PutRecord.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testRetryableError() throws Exception {
+        recordReader = new MockRecordParser();
+        testRunner.addControllerService("reader", recordReader);
+        testRunner.enableControllerService(recordReader);
+
+        mockRecordSinkService.setFailWithRetryableError(true);
+        testRunner.addControllerService("MockRecordSinkService", 
mockRecordSinkService);
+        testRunner.enableControllerService(mockRecordSinkService);
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+        recordReader.addRecord("John Doe", 48, "Soccer");
+
+        testRunner.enqueue("");
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(PutRecord.REL_RETRY, 1);
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-api/src/main/java/org/apache/nifi/record/sink/RetryableIOException.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-api/src/main/java/org/apache/nifi/record/sink/RetryableIOException.java
new file mode 100644
index 0000000..9fd4a65
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-sink-api/src/main/java/org/apache/nifi/record/sink/RetryableIOException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.record.sink;
+
+import java.io.IOException;
+
+/**
+ * This is a marker class for IOExceptions that may succeed if the operation 
is performed again, it is a hint to retry the operation
+ */
+public class RetryableIOException extends IOException {
+
+    public RetryableIOException() {
+        super();
+    }
+
+    public RetryableIOException(String message) {
+        super(message);
+    }
+
+    public RetryableIOException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public RetryableIOException(Throwable cause) {
+        super(cause);
+    }
+}

Reply via email to