exceptionfactory commented on a change in pull request #5732: URL: https://github.com/apache/nifi/pull/5732#discussion_r798184108
########## File path: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/document/ExtractDocumentText.java ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.document; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.tika.Tika; +import org.apache.tika.exception.TikaException; + +import java.io.BufferedInputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +@Tags({"extract, text, pdf, word, excel, powerpoint, office"}) +@CapabilityDescription("Run Apache Tika text extraction to extra the text from supported binary file formats such as PDF " + + "and Microsoft Office files.") +public class ExtractDocumentText extends AbstractProcessor { + private static final String TEXT_PLAIN = "text/plain"; + + public static final String FIELD_MAX_TEXT_LENGTH = "MAX_TEXT_LENGTH"; + public static final String FIELD_SUCCESS = "success"; + public static final String FIELD_FAILURE = "failure"; + + public static final PropertyDescriptor MAX_TEXT_LENGTH = new PropertyDescriptor.Builder() + .name(FIELD_MAX_TEXT_LENGTH) + .displayName("Max Output Text Length") + .description("The maximum length of text to retrieve. This is used to limit memory usage for " + + "dealing with large files. Specify -1 for unlimited length.") + .required(false).defaultValue("-1").addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name(FIELD_SUCCESS) + .description("Successfully extract content.").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name(FIELD_FAILURE) + .description("Failed to extract content.").build(); + + private List<PropertyDescriptor> descriptors = Collections.unmodifiableList(Arrays.asList(MAX_TEXT_LENGTH)); + private Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + return; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + flowFile = session.create(); + } + + final int maxTextLength = context.getProperty(MAX_TEXT_LENGTH).evaluateAttributeExpressions(flowFile).asInteger(); + final String filename = flowFile.getAttribute("filename"); + + try { + final AtomicReference<String> type = new AtomicReference<>(); + final AtomicReference<Boolean> wasError = new AtomicReference<>(false); + + flowFile = session.write(flowFile, (inputStream, outputStream) -> { + if (inputStream != null) { + BufferedInputStream buffStream = new BufferedInputStream(inputStream); + Tika tika = new Tika(); + String text = ""; + try { + type.set(tika.detect(buffStream, filename)); + tika.setMaxStringLength(maxTextLength); + text = tika.parseToString(buffStream); + + } catch (TikaException e) { + getLogger().error("Apache Tika failed to parse input " + e.getLocalizedMessage()); + wasError.set(true); + } + + outputStream.write(text.getBytes()); + buffStream.close(); + } else { + getLogger().error("Input file was null"); + wasError.set(true); + } + }); + + if (wasError.get()) { + session.transfer(flowFile, REL_FAILURE); + } else { + + Map<String, String> mimeAttrs = new HashMap<>(); + mimeAttrs.put("mime.type", TEXT_PLAIN); Review comment: This attribute name should be replaced with `CoreAttribute.MIME_TYPE.key()`. ########## File path: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/test/java/org/apache/nifi/processors/document/ExtractDocumentTextTest.java ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.document; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.UnsupportedEncodingException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +public class ExtractDocumentTextTest { + + private TestRunner testRunner; + + @BeforeEach + public void init() { + testRunner = TestRunners.newTestRunner(ExtractDocumentText.class); + } + + @Test + public void processor_should_support_pdf_types_without_exception() { + try { + final String filename = "simple.pdf"; + MockFlowFile flowFile = testRunner.enqueue(new FileInputStream("src/test/resources/" + filename)); + Map<String, String> attrs = new HashMap<String, String>() {{ put("filename", filename); }}; + flowFile.putAttributes(attrs); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } + + testRunner.assertValid(); + testRunner.run(); + testRunner.assertTransferCount(ExtractDocumentText.REL_FAILURE, 0); + + List<MockFlowFile> successFiles = testRunner.getFlowFilesForRelationship(ExtractDocumentText.REL_SUCCESS); + for (MockFlowFile mockFile : successFiles) { + try { + String result = new String(mockFile.toByteArray(), "UTF-8"); + String trimmedResult = result.trim(); + assertTrue(trimmedResult.startsWith("A Simple PDF File")); + System.out.println("FILE:" + result); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); Review comment: All `System.out.println()` and `printStackTrace()` calls should be removed. ########## File path: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/document/ExtractDocumentText.java ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.document; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.tika.Tika; +import org.apache.tika.exception.TikaException; + +import java.io.BufferedInputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +@Tags({"extract, text, pdf, word, excel, powerpoint, office"}) +@CapabilityDescription("Run Apache Tika text extraction to extra the text from supported binary file formats such as PDF " + + "and Microsoft Office files.") +public class ExtractDocumentText extends AbstractProcessor { + private static final String TEXT_PLAIN = "text/plain"; + + public static final String FIELD_MAX_TEXT_LENGTH = "MAX_TEXT_LENGTH"; + public static final String FIELD_SUCCESS = "success"; + public static final String FIELD_FAILURE = "failure"; + + public static final PropertyDescriptor MAX_TEXT_LENGTH = new PropertyDescriptor.Builder() + .name(FIELD_MAX_TEXT_LENGTH) + .displayName("Max Output Text Length") + .description("The maximum length of text to retrieve. This is used to limit memory usage for " + + "dealing with large files. Specify -1 for unlimited length.") + .required(false).defaultValue("-1").addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name(FIELD_SUCCESS) + .description("Successfully extract content.").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name(FIELD_FAILURE) + .description("Failed to extract content.").build(); + + private List<PropertyDescriptor> descriptors = Collections.unmodifiableList(Arrays.asList(MAX_TEXT_LENGTH)); + private Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + return; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + flowFile = session.create(); + } + + final int maxTextLength = context.getProperty(MAX_TEXT_LENGTH).evaluateAttributeExpressions(flowFile).asInteger(); + final String filename = flowFile.getAttribute("filename"); + + try { + final AtomicReference<String> type = new AtomicReference<>(); + final AtomicReference<Boolean> wasError = new AtomicReference<>(false); + + flowFile = session.write(flowFile, (inputStream, outputStream) -> { + if (inputStream != null) { + BufferedInputStream buffStream = new BufferedInputStream(inputStream); + Tika tika = new Tika(); + String text = ""; + try { + type.set(tika.detect(buffStream, filename)); + tika.setMaxStringLength(maxTextLength); + text = tika.parseToString(buffStream); + + } catch (TikaException e) { + getLogger().error("Apache Tika failed to parse input " + e.getLocalizedMessage()); + wasError.set(true); + } + + outputStream.write(text.getBytes()); + buffStream.close(); + } else { + getLogger().error("Input file was null"); + wasError.set(true); + } + }); + + if (wasError.get()) { + session.transfer(flowFile, REL_FAILURE); + } else { + + Map<String, String> mimeAttrs = new HashMap<>(); + mimeAttrs.put("mime.type", TEXT_PLAIN); + mimeAttrs.put("orig.mime.type", type.get()); + + flowFile = session.putAllAttributes(flowFile, mimeAttrs); + session.transfer(flowFile, REL_SUCCESS); + } + } catch (final Throwable t) { + getLogger().error("Unable to process ExtractTextProcessor file " + t.getLocalizedMessage()); + getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t}); + // not sure about this one + session.transfer(flowFile, REL_FAILURE); + throw t; Review comment: The exception should not be thrown if the FlowFile is being routed to a relationship. ########## File path: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/document/ExtractDocumentText.java ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.document; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.tika.Tika; +import org.apache.tika.exception.TikaException; + +import java.io.BufferedInputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +@Tags({"extract, text, pdf, word, excel, powerpoint, office"}) +@CapabilityDescription("Run Apache Tika text extraction to extra the text from supported binary file formats such as PDF " + + "and Microsoft Office files.") +public class ExtractDocumentText extends AbstractProcessor { + private static final String TEXT_PLAIN = "text/plain"; + + public static final String FIELD_MAX_TEXT_LENGTH = "MAX_TEXT_LENGTH"; + public static final String FIELD_SUCCESS = "success"; + public static final String FIELD_FAILURE = "failure"; + + public static final PropertyDescriptor MAX_TEXT_LENGTH = new PropertyDescriptor.Builder() + .name(FIELD_MAX_TEXT_LENGTH) + .displayName("Max Output Text Length") + .description("The maximum length of text to retrieve. This is used to limit memory usage for " + + "dealing with large files. Specify -1 for unlimited length.") + .required(false).defaultValue("-1").addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name(FIELD_SUCCESS) + .description("Successfully extract content.").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name(FIELD_FAILURE) + .description("Failed to extract content.").build(); Review comment: ```suggestion .description("Content extraction failed").build(); ``` ########## File path: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/document/ExtractDocumentText.java ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.document; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.tika.Tika; +import org.apache.tika.exception.TikaException; + +import java.io.BufferedInputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +@Tags({"extract, text, pdf, word, excel, powerpoint, office"}) +@CapabilityDescription("Run Apache Tika text extraction to extra the text from supported binary file formats such as PDF " + + "and Microsoft Office files.") +public class ExtractDocumentText extends AbstractProcessor { + private static final String TEXT_PLAIN = "text/plain"; + + public static final String FIELD_MAX_TEXT_LENGTH = "MAX_TEXT_LENGTH"; + public static final String FIELD_SUCCESS = "success"; + public static final String FIELD_FAILURE = "failure"; + + public static final PropertyDescriptor MAX_TEXT_LENGTH = new PropertyDescriptor.Builder() + .name(FIELD_MAX_TEXT_LENGTH) + .displayName("Max Output Text Length") + .description("The maximum length of text to retrieve. This is used to limit memory usage for " + + "dealing with large files. Specify -1 for unlimited length.") + .required(false).defaultValue("-1").addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name(FIELD_SUCCESS) + .description("Successfully extract content.").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name(FIELD_FAILURE) + .description("Failed to extract content.").build(); + + private List<PropertyDescriptor> descriptors = Collections.unmodifiableList(Arrays.asList(MAX_TEXT_LENGTH)); + private Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + return; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + flowFile = session.create(); + } + + final int maxTextLength = context.getProperty(MAX_TEXT_LENGTH).evaluateAttributeExpressions(flowFile).asInteger(); + final String filename = flowFile.getAttribute("filename"); + + try { + final AtomicReference<String> type = new AtomicReference<>(); + final AtomicReference<Boolean> wasError = new AtomicReference<>(false); + + flowFile = session.write(flowFile, (inputStream, outputStream) -> { + if (inputStream != null) { + BufferedInputStream buffStream = new BufferedInputStream(inputStream); + Tika tika = new Tika(); + String text = ""; + try { + type.set(tika.detect(buffStream, filename)); + tika.setMaxStringLength(maxTextLength); + text = tika.parseToString(buffStream); + + } catch (TikaException e) { + getLogger().error("Apache Tika failed to parse input " + e.getLocalizedMessage()); Review comment: The exception should be passed to the log for troubleshooting, as opposed to just the message. ```suggestion getLogger().error("Text parsing failed", e); ``` ########## File path: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/document/ExtractDocumentText.java ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.document; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.tika.Tika; +import org.apache.tika.exception.TikaException; + +import java.io.BufferedInputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +@Tags({"extract, text, pdf, word, excel, powerpoint, office"}) +@CapabilityDescription("Run Apache Tika text extraction to extra the text from supported binary file formats such as PDF " + + "and Microsoft Office files.") +public class ExtractDocumentText extends AbstractProcessor { + private static final String TEXT_PLAIN = "text/plain"; + + public static final String FIELD_MAX_TEXT_LENGTH = "MAX_TEXT_LENGTH"; + public static final String FIELD_SUCCESS = "success"; + public static final String FIELD_FAILURE = "failure"; + + public static final PropertyDescriptor MAX_TEXT_LENGTH = new PropertyDescriptor.Builder() + .name(FIELD_MAX_TEXT_LENGTH) + .displayName("Max Output Text Length") + .description("The maximum length of text to retrieve. This is used to limit memory usage for " + + "dealing with large files. Specify -1 for unlimited length.") + .required(false).defaultValue("-1").addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name(FIELD_SUCCESS) + .description("Successfully extract content.").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name(FIELD_FAILURE) + .description("Failed to extract content.").build(); + + private List<PropertyDescriptor> descriptors = Collections.unmodifiableList(Arrays.asList(MAX_TEXT_LENGTH)); + private Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + return; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + flowFile = session.create(); + } + + final int maxTextLength = context.getProperty(MAX_TEXT_LENGTH).evaluateAttributeExpressions(flowFile).asInteger(); + final String filename = flowFile.getAttribute("filename"); + + try { + final AtomicReference<String> type = new AtomicReference<>(); + final AtomicReference<Boolean> wasError = new AtomicReference<>(false); + + flowFile = session.write(flowFile, (inputStream, outputStream) -> { + if (inputStream != null) { Review comment: This null check should not be necessary. ########## File path: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/document/ExtractDocumentText.java ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.document; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.tika.Tika; +import org.apache.tika.exception.TikaException; + +import java.io.BufferedInputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +@Tags({"extract, text, pdf, word, excel, powerpoint, office"}) +@CapabilityDescription("Run Apache Tika text extraction to extra the text from supported binary file formats such as PDF " + + "and Microsoft Office files.") +public class ExtractDocumentText extends AbstractProcessor { + private static final String TEXT_PLAIN = "text/plain"; + + public static final String FIELD_MAX_TEXT_LENGTH = "MAX_TEXT_LENGTH"; + public static final String FIELD_SUCCESS = "success"; + public static final String FIELD_FAILURE = "failure"; + + public static final PropertyDescriptor MAX_TEXT_LENGTH = new PropertyDescriptor.Builder() + .name(FIELD_MAX_TEXT_LENGTH) + .displayName("Max Output Text Length") + .description("The maximum length of text to retrieve. This is used to limit memory usage for " + + "dealing with large files. Specify -1 for unlimited length.") + .required(false).defaultValue("-1").addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name(FIELD_SUCCESS) + .description("Successfully extract content.").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name(FIELD_FAILURE) + .description("Failed to extract content.").build(); + + private List<PropertyDescriptor> descriptors = Collections.unmodifiableList(Arrays.asList(MAX_TEXT_LENGTH)); + private Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + return; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + flowFile = session.create(); + } + + final int maxTextLength = context.getProperty(MAX_TEXT_LENGTH).evaluateAttributeExpressions(flowFile).asInteger(); + final String filename = flowFile.getAttribute("filename"); + + try { + final AtomicReference<String> type = new AtomicReference<>(); + final AtomicReference<Boolean> wasError = new AtomicReference<>(false); Review comment: Recommend renaming this variable to something like `exceptionThrown` or `errorFound`. ########## File path: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/test/java/org/apache/nifi/processors/document/ExtractDocumentTextTest.java ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.document; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.UnsupportedEncodingException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +public class ExtractDocumentTextTest { + + private TestRunner testRunner; + + @BeforeEach + public void init() { + testRunner = TestRunners.newTestRunner(ExtractDocumentText.class); + } + + @Test + public void processor_should_support_pdf_types_without_exception() { Review comment: Although the current code base is not consistent, method names should follow standard Java conventions and use camelCase naming. ########## File path: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/document/ExtractDocumentText.java ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.document; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.tika.Tika; +import org.apache.tika.exception.TikaException; + +import java.io.BufferedInputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +@Tags({"extract, text, pdf, word, excel, powerpoint, office"}) +@CapabilityDescription("Run Apache Tika text extraction to extra the text from supported binary file formats such as PDF " + + "and Microsoft Office files.") +public class ExtractDocumentText extends AbstractProcessor { + private static final String TEXT_PLAIN = "text/plain"; + + public static final String FIELD_MAX_TEXT_LENGTH = "MAX_TEXT_LENGTH"; + public static final String FIELD_SUCCESS = "success"; + public static final String FIELD_FAILURE = "failure"; + + public static final PropertyDescriptor MAX_TEXT_LENGTH = new PropertyDescriptor.Builder() + .name(FIELD_MAX_TEXT_LENGTH) + .displayName("Max Output Text Length") + .description("The maximum length of text to retrieve. This is used to limit memory usage for " + + "dealing with large files. Specify -1 for unlimited length.") + .required(false).defaultValue("-1").addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name(FIELD_SUCCESS) + .description("Successfully extract content.").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name(FIELD_FAILURE) + .description("Failed to extract content.").build(); + + private List<PropertyDescriptor> descriptors = Collections.unmodifiableList(Arrays.asList(MAX_TEXT_LENGTH)); + private Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + return; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + flowFile = session.create(); + } + + final int maxTextLength = context.getProperty(MAX_TEXT_LENGTH).evaluateAttributeExpressions(flowFile).asInteger(); + final String filename = flowFile.getAttribute("filename"); + + try { + final AtomicReference<String> type = new AtomicReference<>(); + final AtomicReference<Boolean> wasError = new AtomicReference<>(false); + + flowFile = session.write(flowFile, (inputStream, outputStream) -> { + if (inputStream != null) { + BufferedInputStream buffStream = new BufferedInputStream(inputStream); + Tika tika = new Tika(); + String text = ""; + try { + type.set(tika.detect(buffStream, filename)); + tika.setMaxStringLength(maxTextLength); + text = tika.parseToString(buffStream); + + } catch (TikaException e) { + getLogger().error("Apache Tika failed to parse input " + e.getLocalizedMessage()); + wasError.set(true); + } + + outputStream.write(text.getBytes()); + buffStream.close(); + } else { + getLogger().error("Input file was null"); + wasError.set(true); + } + }); + + if (wasError.get()) { + session.transfer(flowFile, REL_FAILURE); + } else { + + Map<String, String> mimeAttrs = new HashMap<>(); + mimeAttrs.put("mime.type", TEXT_PLAIN); + mimeAttrs.put("orig.mime.type", type.get()); + + flowFile = session.putAllAttributes(flowFile, mimeAttrs); + session.transfer(flowFile, REL_SUCCESS); + } + } catch (final Throwable t) { + getLogger().error("Unable to process ExtractTextProcessor file " + t.getLocalizedMessage()); + getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t}); Review comment: This should be combined into a single error log and the wrapping `Object[]` should be removed. ########## File path: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/test/java/org/apache/nifi/processors/document/ExtractDocumentTextTest.java ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.document; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.UnsupportedEncodingException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +public class ExtractDocumentTextTest { + + private TestRunner testRunner; + + @BeforeEach + public void init() { + testRunner = TestRunners.newTestRunner(ExtractDocumentText.class); + } + + @Test + public void processor_should_support_pdf_types_without_exception() { + try { + final String filename = "simple.pdf"; + MockFlowFile flowFile = testRunner.enqueue(new FileInputStream("src/test/resources/" + filename)); + Map<String, String> attrs = new HashMap<String, String>() {{ put("filename", filename); }}; + flowFile.putAttributes(attrs); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } + + testRunner.assertValid(); + testRunner.run(); + testRunner.assertTransferCount(ExtractDocumentText.REL_FAILURE, 0); + + List<MockFlowFile> successFiles = testRunner.getFlowFilesForRelationship(ExtractDocumentText.REL_SUCCESS); + for (MockFlowFile mockFile : successFiles) { + try { + String result = new String(mockFile.toByteArray(), "UTF-8"); + String trimmedResult = result.trim(); + assertTrue(trimmedResult.startsWith("A Simple PDF File")); + System.out.println("FILE:" + result); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + } + } + + @Test + public void processor_should_support_doc_types_without_exception() { + try { + final String filename = "simple.doc"; Review comment: Having multiple sample binary files adds unnecessary content to the source code repository. Apache Tika supports a wide array of formats, so having multiple types of documents does not provide different processing in the NiFi code itself. For this reason, recommend removing all binary files and instead testing with some simple strings. ########## File path: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/document/ExtractDocumentText.java ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.document; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.tika.Tika; +import org.apache.tika.exception.TikaException; + +import java.io.BufferedInputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +@Tags({"extract, text, pdf, word, excel, powerpoint, office"}) +@CapabilityDescription("Run Apache Tika text extraction to extra the text from supported binary file formats such as PDF " + + "and Microsoft Office files.") +public class ExtractDocumentText extends AbstractProcessor { + private static final String TEXT_PLAIN = "text/plain"; + + public static final String FIELD_MAX_TEXT_LENGTH = "MAX_TEXT_LENGTH"; + public static final String FIELD_SUCCESS = "success"; + public static final String FIELD_FAILURE = "failure"; + + public static final PropertyDescriptor MAX_TEXT_LENGTH = new PropertyDescriptor.Builder() + .name(FIELD_MAX_TEXT_LENGTH) + .displayName("Max Output Text Length") + .description("The maximum length of text to retrieve. This is used to limit memory usage for " + + "dealing with large files. Specify -1 for unlimited length.") + .required(false).defaultValue("-1").addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name(FIELD_SUCCESS) + .description("Successfully extract content.").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name(FIELD_FAILURE) + .description("Failed to extract content.").build(); + + private List<PropertyDescriptor> descriptors = Collections.unmodifiableList(Arrays.asList(MAX_TEXT_LENGTH)); + private Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + return; + } Review comment: This method definition should be removed. ########## File path: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/test/java/org/apache/nifi/processors/document/ExtractDocumentTextTest.java ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.document; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.UnsupportedEncodingException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +public class ExtractDocumentTextTest { + + private TestRunner testRunner; + + @BeforeEach + public void init() { + testRunner = TestRunners.newTestRunner(ExtractDocumentText.class); + } + + @Test + public void processor_should_support_pdf_types_without_exception() { + try { + final String filename = "simple.pdf"; + MockFlowFile flowFile = testRunner.enqueue(new FileInputStream("src/test/resources/" + filename)); + Map<String, String> attrs = new HashMap<String, String>() {{ put("filename", filename); }}; Review comment: Declaring anonymous dynamic instances of HashMap should be avoided. This can be refactored to use `Collections.singletonMap()`. ########## File path: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/document/ExtractDocumentText.java ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.document; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.tika.Tika; +import org.apache.tika.exception.TikaException; + +import java.io.BufferedInputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +@Tags({"extract, text, pdf, word, excel, powerpoint, office"}) +@CapabilityDescription("Run Apache Tika text extraction to extra the text from supported binary file formats such as PDF " + + "and Microsoft Office files.") +public class ExtractDocumentText extends AbstractProcessor { + private static final String TEXT_PLAIN = "text/plain"; + + public static final String FIELD_MAX_TEXT_LENGTH = "MAX_TEXT_LENGTH"; + public static final String FIELD_SUCCESS = "success"; + public static final String FIELD_FAILURE = "failure"; + + public static final PropertyDescriptor MAX_TEXT_LENGTH = new PropertyDescriptor.Builder() + .name(FIELD_MAX_TEXT_LENGTH) + .displayName("Max Output Text Length") + .description("The maximum length of text to retrieve. This is used to limit memory usage for " + + "dealing with large files. Specify -1 for unlimited length.") + .required(false).defaultValue("-1").addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name(FIELD_SUCCESS) + .description("Successfully extract content.").build(); Review comment: Recommend removing period character: ```suggestion .description("Content extraction success").build(); ``` ########## File path: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/document/ExtractDocumentText.java ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.document; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.tika.Tika; +import org.apache.tika.exception.TikaException; + +import java.io.BufferedInputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +@Tags({"extract, text, pdf, word, excel, powerpoint, office"}) +@CapabilityDescription("Run Apache Tika text extraction to extra the text from supported binary file formats such as PDF " + + "and Microsoft Office files.") +public class ExtractDocumentText extends AbstractProcessor { + private static final String TEXT_PLAIN = "text/plain"; + + public static final String FIELD_MAX_TEXT_LENGTH = "MAX_TEXT_LENGTH"; Review comment: Recommend using lowercase for property names: ```suggestion public static final String FIELD_MAX_TEXT_LENGTH = "max-text-length"; ``` ########## File path: nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/document/ExtractDocumentText.java ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.document; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.tika.Tika; +import org.apache.tika.exception.TikaException; + +import java.io.BufferedInputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +@Tags({"extract, text, pdf, word, excel, powerpoint, office"}) +@CapabilityDescription("Run Apache Tika text extraction to extra the text from supported binary file formats such as PDF " + + "and Microsoft Office files.") +public class ExtractDocumentText extends AbstractProcessor { + private static final String TEXT_PLAIN = "text/plain"; + + public static final String FIELD_MAX_TEXT_LENGTH = "MAX_TEXT_LENGTH"; + public static final String FIELD_SUCCESS = "success"; + public static final String FIELD_FAILURE = "failure"; + + public static final PropertyDescriptor MAX_TEXT_LENGTH = new PropertyDescriptor.Builder() + .name(FIELD_MAX_TEXT_LENGTH) + .displayName("Max Output Text Length") + .description("The maximum length of text to retrieve. This is used to limit memory usage for " + + "dealing with large files. Specify -1 for unlimited length.") + .required(false).defaultValue("-1").addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name(FIELD_SUCCESS) + .description("Successfully extract content.").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name(FIELD_FAILURE) + .description("Failed to extract content.").build(); + + private List<PropertyDescriptor> descriptors = Collections.unmodifiableList(Arrays.asList(MAX_TEXT_LENGTH)); + private Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + return; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + flowFile = session.create(); Review comment: This should be changed to a short-circuit return instead of creating a new empty FlowFile. ```suggestion return; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
