This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 35cf9f87be NIFI-11179 Removed Flume components
35cf9f87be is described below
commit 35cf9f87be6ce64fb8d58db1eee3c9e4ee4de484
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Feb 14 15:16:57 2023 -0500
NIFI-11179 Removed Flume components
This closes #6950
Signed-off-by: David Handermann <[email protected]>
---
.../nifi-flume-bundle/nifi-flume-nar/pom.xml | 136 ---------
.../src/main/resources/META-INF/LICENSE | 319 ---------------------
.../src/main/resources/META-INF/NOTICE | 186 ------------
.../nifi-flume-processors/pom.xml | 219 --------------
.../processors/flume/AbstractFlumeProcessor.java | 160 -----------
.../nifi/processors/flume/ExecuteFlumeSink.java | 158 ----------
.../nifi/processors/flume/ExecuteFlumeSource.java | 210 --------------
.../nifi/processors/flume/NifiChannelSelector.java | 69 -----
.../nifi/processors/flume/NifiSessionChannel.java | 47 ---
.../flume/NifiSessionFactoryChannel.java | 50 ----
.../processors/flume/NifiSinkSessionChannel.java | 49 ----
.../nifi/processors/flume/NifiSinkTransaction.java | 72 -----
.../nifi/processors/flume/NifiTransaction.java | 54 ----
.../nifi/processors/flume/util/FlowFileEvent.java | 119 --------
.../flume/util/FlowFileEventConstants.java | 37 ---
.../services/org.apache.nifi.processor.Processor | 16 --
.../additionalDetails.html | 155 ----------
.../additionalDetails.html | 114 --------
.../processors/flume/ExecuteFlumeSinkTest.java | 168 -----------
.../processors/flume/ExecuteFlumeSourceTest.java | 151 ----------
.../src/test/resources/core-site-broken.xml | 25 --
.../src/test/resources/core-site.xml | 30 --
.../src/test/resources/simplelogger.properties | 21 --
.../src/test/resources/testdata/records.txt | 4 -
nifi-nar-bundles/nifi-flume-bundle/pom.xml | 51 ----
nifi-nar-bundles/pom.xml | 1 -
26 files changed, 2621 deletions(-)
diff --git a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml
deleted file mode 100644
index 0c03ce3758..0000000000
--- a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml
+++ /dev/null
@@ -1,136 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-flume-bundle</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </parent>
- <artifactId>nifi-flume-nar</artifactId>
- <packaging>nar</packaging>
- <properties>
- <maven.javadoc.skip>true</maven.javadoc.skip>
- <source.skip>true</source.skip>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-flume-processors</artifactId>
- <!-- The following are inherited from nifi-hadoop-libraries-nar -->
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.thoughtworks.paranamer</groupId>
- <artifactId>paranamer</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpcore</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-hadoop-libraries-nar</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <type>nar</type>
- </dependency>
- </dependencies>
-</project>
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/src/main/resources/META-INF/LICENSE
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/src/main/resources/META-INF/LICENSE
deleted file mode 100644
index c1a3ec4fd2..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/src/main/resources/META-INF/LICENSE
+++ /dev/null
@@ -1,319 +0,0 @@
-
- Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
- TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
- 1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
- 2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
- 3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
- 4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
- 5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
- 6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
- 7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
- 8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
- 9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
- END OF TERMS AND CONDITIONS
-
- APPENDIX: How to apply the Apache License to your work.
-
- To apply the Apache License to your work, attach the following
- boilerplate notice, with the fields enclosed by brackets "[]"
- replaced with your own identifying information. (Don't include
- the brackets!) The text should be enclosed in the appropriate
- comment syntax for the file format. We also recommend that a
- file or class name and description of purpose be included on the
- same "printed page" as the copyright notice for easier
- identification within third-party archives.
-
- Copyright [yyyy] [name of copyright owner]
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
-APACHE NIFI SUBCOMPONENTS:
-
-The Apache NiFi project contains subcomponents with separate copyright
-notices and license terms. Your use of the source code for the these
-subcomponents is subject to the terms and conditions of the following
-licenses.
-
-The binary distribution of this product bundles 'SUAsync Library' which is
-available under a 3-Clause BSD License.
-
- Copyright (c) 2010 StumbleUpon, Inc. All rights reserved.
-
- Redistribution and use in source and binary forms, with or without
- modification, are permitted provided that the following conditions are met:
- - Redistributions of source code must retain the above copyright notice,
- this list of conditions and the following disclaimer.
- - Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
- - Neither the name of the StumbleUpon nor the names of its contributors
- may be used to endorse or promote products derived from this software
- without specific prior written permission.
- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- POSSIBILITY OF SUCH DAMAGE.
-
-The binary distribution of this product bundles 'Asynchronous HBase Client'
-which is available under a 3-Clause BSD License.
-
- Copyright (C) 2010-2012 The Async HBase Authors. All rights reserved.
-
- Redistribution and use in source and binary forms, with or without
- modification, are permitted provided that the following conditions are met:
- - Redistributions of source code must retain the above copyright notice,
- this list of conditions and the following disclaimer.
- - Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
- - Neither the name of the StumbleUpon nor the names of its contributors
- may be used to endorse or promote products derived from this software
- without specific prior written permission.
- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- POSSIBILITY OF SUCH DAMAGE.
-
-The binary distribution of this product bundles 'JOpt Simple' which is
-available under the MIT license.
-
- The MIT License
-
- Copyright (c) 2004-2011 Paul R. Holser, Jr.
-
- Permission is hereby granted, free of charge, to any person obtaining
- a copy of this software and associated documentation files (the
- "Software"), to deal in the Software without restriction, including
- without limitation the rights to use, copy, modify, merge, publish,
- distribute, sublicense, and/or sell copies of the Software, and to
- permit persons to whom the Software is furnished to do so, subject to
- the following conditions:
-
- The above copyright notice and this permission notice shall be
- included in all copies or substantial portions of the Software.
-
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
- EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
- MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
- NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
- LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
- OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
- WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
-
-The binary distribution of this product bundles 'Scala Library' under a BSD
-style license.
-
- Copyright (c) 2002-2015 EPFL
- Copyright (c) 2011-2015 Typesafe, Inc.
-
- All rights reserved.
-
- Redistribution and use in source and binary forms, with or without
modification,
- are permitted provided that the following conditions are met:
-
- Redistributions of source code must retain the above copyright notice,
this list of
- conditions and the following disclaimer.
-
- Redistributions in binary form must reproduce the above copyright
notice, this list of
- conditions and the following disclaimer in the documentation
and/or other materials
- provided with the distribution.
-
- Neither the name of the EPFL nor the names of its contributors may
be used to endorse
- or promote products derived from this software without
specific prior written permission.
-
- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS “AS
IS” AND ANY EXPRESS
- OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY
- AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
COPYRIGHT OWNER OR
- CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL
- DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE,
- DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER
- IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT
- OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
DAMAGE.
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/src/main/resources/META-INF/NOTICE
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/src/main/resources/META-INF/NOTICE
deleted file mode 100644
index 466e5ab18d..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/src/main/resources/META-INF/NOTICE
+++ /dev/null
@@ -1,186 +0,0 @@
-nifi-flume-nar
-Copyright 2015-2020 The Apache Software Foundation
-
-This product includes software developed at
-The Apache Software Foundation (http://www.apache.org/).
-
-******************
-Apache Software License v2
-******************
-
-The following binary components are provided under the Apache Software License
v2
-
- (ASLv2) Apache Avro
- The following NOTICE information applies:
- Apache Avro
- Copyright 2009-2017 The Apache Software Foundation
-
- (ASLv2) Apache Commons JEXL
- The following NOTICE information applies:
- Apache Commons JEXL
- Copyright 2001-2011 The Apache Software Foundation
-
- (ASLv2) Apache Commons Lang
- The following NOTICE information applies:
- Apache Commons Lang
- Copyright 2001-2015 The Apache Software Foundation
-
- This product includes software from the Spring Framework,
- under the Apache License 2.0 (see: StringUtils.containsWhitespace())
-
- (ASLv2) Apache Flume
- The following NOTICE information applies:
- Apache Flume
- Copyright 2011-2015 Apache Software Foundation
-
- asynchbase is BSD-licensed software
(https://github.com/OpenTSDB/asynchbase)
-
- async is BSD-licensed software (https://github.com/stumbleupon/async)
-
- jopt-simple is MIT licensed software
(http://pholser.github.io/jopt-simple/license.html)
-
- scala-library is BSD-like licensed software
(http://www.scala-lang.org/license.html)
-
- (ASLv2) Xalan
- This product includes software developed by
- The Apache Software Foundation (http://www.apache.org/).
-
- Portions of this software was originally based on the following:
-
- - software copyright (c) 1999-2002, Lotus Development Corporation.,
http://www.lotus.com.
- - software copyright (c) 2001-2002, Sun Microsystems.,
http://www.sun.com.
- - software copyright (c) 2003, IBM Corporation., http://www.ibm.com.
- - voluntary contributions made by Ovidiu Predescu ([email protected])
on behalf of the
- Apache Software Foundation and was originally developed at Hewlett
Packard Company.
-
- (ASLv2) Apache Xerces Java
- Copyright 1999-2007 The Apache Software Foundation
-
- This product includes software developed at
- The Apache Software Foundation (http://www.apache.org/).
-
- Portions of this software were originally based on the following:
- - software copyright (c) 1999, IBM Corporation., http://www.ibm.com.
- - software copyright (c) 1999, Sun Microsystems., http://www.sun.com.
- - voluntary contributions made by Paul Eng on behalf of the
- Apache Software Foundation that were originally developed at iClick,
Inc.,
- software copyright (c) 1999.
-
- (ASLv2) Apache XML Commons XML APIs
- Copyright 2006 The Apache Software Foundation.
-
- This product includes software developed at
- The Apache Software Foundation (http://www.apache.org/).
-
- Portions of this software were originally based on the following:
- - software copyright (c) 1999, IBM Corporation., http://www.ibm.com.
- - software copyright (c) 1999, Sun Microsystems., http://www.sun.com.
- - software copyright (c) 2000 World Wide Web Consortium, http://www.w3.org
-
- (ASLv2) IRClib
- The following NOTICE information applies:
- IRClib -- A Java Internet Relay Chat library --
- Copyright (C) 2002 - 2006 Christoph Schwering <[email protected]>
-
- (ASLv2) Jackson JSON processor
- The following NOTICE information applies:
- # Jackson JSON processor
-
- Jackson is a high-performance, Free/Open Source JSON processing library.
- It was originally written by Tatu Saloranta ([email protected]), and
has
- been in development since 2007.
- It is currently developed by a community of developers, as well as
supported
- commercially by FasterXML.com.
-
- ## Licensing
-
- Jackson core and extension components may licensed under different
licenses.
- To find the details that apply to this artifact see the accompanying
LICENSE file.
- For more information, including possible other licensing options, contact
- FasterXML.com (http://fasterxml.com).
-
- ## Credits
-
- A list of contributors may be found from CREDITS file, which is included
- in some artifacts (usually source distributions); but is always available
- from the source code management (SCM) system project uses.
-
- (ASLv2) Joda-Time
- The following NOTICE information applies:
-
=============================================================================
- = NOTICE file corresponding to section 4d of the Apache License Version
2.0 =
-
=============================================================================
- This product includes software developed by
- Joda.org (http://www.joda.org/).
-
- (ASLv2) Apache Kafka
- The following NOTICE information applies:
- Apache Kafka
- Copyright 2012 The Apache Software Foundation.
-
- (ASLv2) Kite SDK
- The following NOTICE information applies:
- This product includes software developed by Cloudera, Inc.
- (http://www.cloudera.com/).
-
- This product includes software developed at
- The Apache Software Foundation (http://www.apache.org/).
-
- This product includes software developed by
- Saxonica (http://www.saxonica.com/).
-
- (ASLv2) Apache Thrift
- The following NOTICE information applies:
- Apache Thrift
- Copyright 2006-2010 The Apache Software Foundation.
-
- (ASLv2) Yammer Metrics
- The following NOTICE information applies:
- Metrics
- Copyright 2010-2012 Coda Hale and Yammer, Inc.
-
- This product includes software developed by Coda Hale and Yammer, Inc.
-
- This product includes code derived from the JSR-166 project
(ThreadLocalRandom), which was released
- with the following comments:
-
- Written by Doug Lea with assistance from members of JCP JSR-166
- Expert Group and released to the public domain, as explained at
- http://creativecommons.org/publicdomain/zero/1.0/
-
- (ASLv2) Apache MINA
- The following NOTICE information applies:
- Apache MINA Core
- Copyright 2004-2011 Apache MINA Project
-
- (ASLv2) The Netty Project
- The following NOTICE information applies:
- The Netty Project
- Copyright 2011 The Netty Project
-
- (ASLv2) opencsv (net.sf.opencsv:opencsv:2.3)
-
- (ASLv2) Parquet MR
- The following NOTICE information applies:
- Parquet MR
- Copyright 2012 Twitter, Inc.
-
- This project includes code from https://github.com/lemire/JavaFastPFOR
-
parquet-column/src/main/java/parquet/column/values/bitpacking/LemireBitPacking.java
- Apache License Version 2.0 http://www.apache.org/licenses/.
- (c) Daniel Lemire, http://lemire.me/en/
-
- (ASLv2) Jetty
- The following NOTICE information applies:
- Jetty Web Container
- Copyright 1995-2019 Mort Bay Consulting Pty Ltd.
-
- (ASLv2) Apache Velocity
- The following NOTICE information applies:
- Apache Velocity
- Copyright (C) 2000-2007 The Apache Software Foundation
-
- (ASLv2) ZkClient
- The following NOTICE information applies:
- ZkClient
- Copyright 2009 Stefan Groschupf
diff --git a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml
deleted file mode 100644
index ba7e6e0c8c..0000000000
--- a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml
+++ /dev/null
@@ -1,219 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-flume-bundle</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </parent>
- <artifactId>nifi-flume-processors</artifactId>
- <packaging>jar</packaging>
-
- <properties>
- <flume.version>1.11.0</flume.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-utils</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-flowfile-packager</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-sdk</artifactId>
- <version>${flume.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
- <version>${flume.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- <!-- not allowed findbugs version pulled in by guava version -
this should be fixed by flume... -->
- <exclusion>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <!-- pulled in to replace the not allowed version of findbugs -->
- <dependency>
- <groupId>com.github.stephenc.findbugs</groupId>
- <artifactId>findbugs-annotations</artifactId>
- <version>1.3.9-1</version>
- </dependency>
-
- <!-- Flume Sources -->
-
- <dependency>
- <groupId>org.apache.flume.flume-ng-sources</groupId>
- <artifactId>flume-jms-source</artifactId>
- <version>${flume.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flume.flume-ng-sources</groupId>
- <artifactId>flume-kafka-source</artifactId>
- <version>${flume.version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.flume.flume-ng-sources</groupId>
- <artifactId>flume-scribe-source</artifactId>
- <version>${flume.version}</version>
- </dependency>
-
- <!-- Flume Sinks -->
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.flume.flume-ng-sinks</groupId>
- <artifactId>flume-hdfs-sink</artifactId>
- <version>${flume.version}</version>
- </dependency>
-
- <!-- HDFS sink dependencies -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-reload4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>${hadoop.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flume.flume-ng-sinks</groupId>
- <artifactId>flume-hive-sink</artifactId>
- <version>${flume.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flume.flume-ng-sinks</groupId>
- <artifactId>flume-irc-sink</artifactId>
- <version>${flume.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flume.flume-ng-sinks</groupId>
- <artifactId>flume-ng-hbase-sink</artifactId>
- <version>${flume.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flume.flume-ng-sinks</groupId>
- <artifactId>flume-ng-kafka-sink</artifactId>
- <version>${flume.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flume.flume-ng-sinks</groupId>
- <artifactId>flume-ng-morphline-solr-sink</artifactId>
- <version>${flume.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-mock</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.rat</groupId>
- <artifactId>apache-rat-plugin</artifactId>
- <configuration>
- <excludes>
- <!-- test data -->
- <exclude>src/test/resources/testdata/*</exclude>
- </excludes>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <systemPropertyVariables>
-
<java.io.tmpdir>${project.build.directory}</java.io.tmpdir>
- </systemPropertyVariables>
- </configuration>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java
deleted file mode 100644
index 13d2ff7412..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.flume;
-
-import com.google.common.collect.Maps;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.SinkFactory;
-import org.apache.flume.SourceFactory;
-import org.apache.flume.sink.DefaultSinkFactory;
-import org.apache.flume.source.DefaultSourceFactory;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processors.flume.util.FlowFileEvent;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.StringReader;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * This is a base class that is helpful when building processors interacting
with Flume.
- */
-public abstract class AbstractFlumeProcessor extends
AbstractSessionFactoryProcessor {
-
- protected static final SourceFactory SOURCE_FACTORY = new
DefaultSourceFactory();
- protected static final SinkFactory SINK_FACTORY = new DefaultSinkFactory();
-
- protected static Event flowFileToEvent(FlowFile flowFile, ProcessSession
session) {
- return new FlowFileEvent(flowFile, session);
- }
-
- protected static void transferEvent(final Event event, ProcessSession
session,
- Relationship relationship) {
- FlowFile flowFile = session.create();
- flowFile = session.putAllAttributes(flowFile, event.getHeaders());
-
- flowFile = session.write(flowFile, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws IOException {
- out.write(event.getBody());
- }
- });
-
- session.getProvenanceReporter()
- .create(flowFile);
- session.transfer(flowFile, relationship);
- }
-
- protected static Validator createSourceValidator() {
- return new Validator() {
- @Override
- public ValidationResult validate(final String subject, final
String value, final ValidationContext context) {
- String reason = null;
- try {
- ExecuteFlumeSource.SOURCE_FACTORY.create("NiFi Source",
value);
- } catch (Exception ex) {
- reason = ex.getLocalizedMessage();
- reason = Character.toLowerCase(reason.charAt(0)) +
reason.substring(1);
- }
- return new ValidationResult.Builder().subject(subject)
- .input(value)
- .explanation(reason)
- .valid(reason == null)
- .build();
- }
- };
- }
-
- protected static Validator createSinkValidator() {
- return new Validator() {
- @Override
- public ValidationResult validate(final String subject, final
String value, final ValidationContext context) {
- String reason = null;
- try {
- ExecuteFlumeSink.SINK_FACTORY.create("NiFi Sink", value);
- } catch (Exception ex) {
- reason = ex.getLocalizedMessage();
- reason = Character.toLowerCase(reason.charAt(0)) +
reason.substring(1);
- }
- return new ValidationResult.Builder().subject(subject)
- .input(value)
- .explanation(reason)
- .valid(reason == null)
- .build();
- }
- };
- }
-
- protected static Context getFlumeContext(String flumeConfig, String
prefix) {
- Properties flumeProperties = new Properties();
- if (flumeConfig != null) {
- try {
- flumeProperties.load(new StringReader(flumeConfig));
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- }
- Map<String, String> parameters = Maps.newHashMap();
- for (String property : flumeProperties.stringPropertyNames()) {
- parameters.put(property, flumeProperties.getProperty(property));
- }
- return new Context(new Context(parameters).getSubProperties(prefix));
- }
-
- protected static Context getFlumeSourceContext(String flumeConfig,
- String agentName, String sourceName) {
- return getFlumeContext(flumeConfig, agentName + ".sources." +
sourceName + ".");
- }
-
- protected static Context getFlumeSinkContext(String flumeConfig,
- String agentName, String sinkName) {
- return getFlumeContext(flumeConfig, agentName + ".sinks." + sinkName +
".");
- }
-
- /*
- * Borrowed from AbstractProcessor. The FlumeSourceProcessor needs to
implement this directly
- * to handle event driven sources, but it's marked final in
AbstractProcessor.
- */
- @Override
- public void onTrigger(final ProcessContext context, final
ProcessSessionFactory sessionFactory) throws ProcessException {
- final ProcessSession session = sessionFactory.createSession();
- try {
- onTrigger(context, session);
- session.commitAsync();
- } catch (final Throwable t) {
- getLogger()
- .error("{} failed to process due to {}; rolling back session",
new Object[]{this, t});
- session.rollback(true);
- throw t;
- }
- }
-
- public abstract void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException;
-
-}
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java
deleted file mode 100644
index 8e7669ea83..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.flume;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Sink;
-import org.apache.flume.conf.Configurables;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.Restricted;
-import org.apache.nifi.annotation.behavior.Restriction;
-import org.apache.nifi.annotation.behavior.TriggerSerially;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.DeprecationNotice;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.RequiredPermission;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-
-import java.util.List;
-import java.util.Set;
-
-/**
- * This processor runs a Flume sink
- */
-@TriggerSerially
-@Tags({"flume", "hadoop", "put", "sink"})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Execute a Flume sink. Each input FlowFile is converted
into a Flume Event for processing by the sink.")
-@Restricted(
- restrictions = {
- @Restriction(
- requiredPermission = RequiredPermission.EXECUTE_CODE,
- explanation = "Provides operator the ability to
execute arbitrary Flume configurations assuming all permissions that NiFi has.")
- }
-)
-@DeprecationNotice(reason = "Apache Flume pipelines should be implemented
using Apache NiFi components")
-public class ExecuteFlumeSink extends AbstractFlumeProcessor {
-
- public static final PropertyDescriptor SINK_TYPE = new
PropertyDescriptor.Builder()
- .name("Sink Type")
- .description("The component type name for the sink. For some
sinks, this is a short, symbolic name (e.g. hdfs)."
- + " For others, it's the fully-qualified name of the Sink
class. See the Flume User Guide for details.")
- .required(true)
- .addValidator(createSinkValidator())
- .build();
- public static final PropertyDescriptor AGENT_NAME = new
PropertyDescriptor.Builder()
- .name("Agent Name")
- .description("The name of the agent used in the Flume sink
configuration")
- .required(true)
- .defaultValue("tier1")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- public static final PropertyDescriptor SOURCE_NAME = new
PropertyDescriptor.Builder()
- .name("Sink Name")
- .description("The name of the sink used in the Flume sink
configuration")
- .required(true)
- .defaultValue("sink-1")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- public static final PropertyDescriptor FLUME_CONFIG = new
PropertyDescriptor.Builder()
- .name("Flume Configuration")
- .description("The Flume configuration for the sink copied from the
flume.properties file")
- .required(true)
- .defaultValue("")
- .addValidator(Validator.VALID)
- .build();
-
- public static final Relationship SUCCESS = new
Relationship.Builder().name("success").build();
- public static final Relationship FAILURE = new
Relationship.Builder().name("failure").build();
-
- private List<PropertyDescriptor> descriptors;
- private Set<Relationship> relationships;
-
- private volatile Sink sink;
- private volatile NifiSinkSessionChannel channel;
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
- this.descriptors = ImmutableList.of(SINK_TYPE, AGENT_NAME,
SOURCE_NAME, FLUME_CONFIG);
- this.relationships = ImmutableSet.of(SUCCESS, FAILURE);
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return descriptors;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return relationships;
- }
-
- @OnScheduled
- public void onScheduled(final ProcessContext context) {
- try {
- channel = new NifiSinkSessionChannel(SUCCESS, FAILURE);
- channel.start();
-
- sink =
SINK_FACTORY.create(context.getProperty(SOURCE_NAME).getValue(),
- context.getProperty(SINK_TYPE).getValue());
- sink.setChannel(channel);
-
- String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
- String agentName = context.getProperty(AGENT_NAME).getValue();
- String sinkName = context.getProperty(SOURCE_NAME).getValue();
- Configurables.configure(sink,
- getFlumeSinkContext(flumeConfig, agentName, sinkName));
-
- sink.start();
- } catch (Throwable th) {
- getLogger().error("Error creating sink", th);
- throw Throwables.propagate(th);
- }
- }
-
- @OnStopped
- public void stopped() {
- sink.stop();
- channel.stop();
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
-
- channel.setSession(session);
- try {
- sink.process();
- } catch (EventDeliveryException ex) {
- throw new ProcessException("Flume event delivery failed", ex);
- }
- }
-}
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java
deleted file mode 100644
index 12c759278a..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.flume;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.EventDrivenSource;
-import org.apache.flume.PollableSource;
-import org.apache.flume.Source;
-import org.apache.flume.channel.ChannelProcessor;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.source.EventDrivenSourceRunner;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.Restricted;
-import org.apache.nifi.annotation.behavior.Restriction;
-import org.apache.nifi.annotation.behavior.TriggerSerially;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.DeprecationNotice;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.RequiredPermission;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * This processor runs a Flume source
- */
-@TriggerSerially
-@Tags({"flume", "hadoop", "get", "source"})
-@InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("Execute a Flume source. Each Flume Event is sent to
the success relationship as a FlowFile")
-@Restricted(
- restrictions = {
- @Restriction(
- requiredPermission = RequiredPermission.EXECUTE_CODE,
- explanation = "Provides operator the ability to
execute arbitrary Flume configurations assuming all permissions that NiFi has.")
- }
-)
-@DeprecationNotice(reason = "Apache Flume pipelines should be implemented
using Apache NiFi components")
-public class ExecuteFlumeSource extends AbstractFlumeProcessor {
-
- public static final PropertyDescriptor SOURCE_TYPE = new
PropertyDescriptor.Builder()
- .name("Source Type")
- .description("The component type name for the source. For some
sources, this is a short, symbolic name"
- + " (e.g. spooldir). For others, it's the fully-qualified name
of the Source class. See the Flume User Guide for details.")
- .required(true)
- .addValidator(createSourceValidator())
- .build();
- public static final PropertyDescriptor AGENT_NAME = new
PropertyDescriptor.Builder()
- .name("Agent Name")
- .description("The name of the agent used in the Flume source
configuration")
- .required(true)
- .defaultValue("tier1")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- public static final PropertyDescriptor SOURCE_NAME = new
PropertyDescriptor.Builder()
- .name("Source Name")
- .description("The name of the source used in the Flume source
configuration")
- .required(true)
- .defaultValue("src-1")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- public static final PropertyDescriptor FLUME_CONFIG = new
PropertyDescriptor.Builder()
- .name("Flume Configuration")
- .description("The Flume configuration for the source copied from the
flume.properties file")
- .required(true)
- .defaultValue("")
- .addValidator(Validator.VALID)
- .build();
-
- public static final Relationship SUCCESS = new
Relationship.Builder().name("success")
- .build();
-
- private List<PropertyDescriptor> descriptors;
- private Set<Relationship> relationships;
-
- private volatile Source source;
-
- private final NifiSessionChannel pollableSourceChannel = new
NifiSessionChannel(SUCCESS);
- private final AtomicReference<ProcessSessionFactory> sessionFactoryRef =
new AtomicReference<>(null);
- private final AtomicReference<EventDrivenSourceRunner> runnerRef = new
AtomicReference<>(null);
- private final AtomicReference<NifiSessionFactoryChannel>
eventDrivenSourceChannelRef = new AtomicReference<>(null);
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
- this.descriptors = ImmutableList.of(SOURCE_TYPE, AGENT_NAME,
SOURCE_NAME, FLUME_CONFIG);
- this.relationships = ImmutableSet.of(SUCCESS);
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return descriptors;
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return relationships;
- }
-
- @OnScheduled
- public void onScheduled(final ProcessContext context) {
- try {
- source = SOURCE_FACTORY.create(
- context.getProperty(SOURCE_NAME).getValue(),
- context.getProperty(SOURCE_TYPE).getValue());
-
- String flumeConfig = context.getProperty(FLUME_CONFIG).getValue();
- String agentName = context.getProperty(AGENT_NAME).getValue();
- String sourceName = context.getProperty(SOURCE_NAME).getValue();
- Configurables.configure(source,
- getFlumeSourceContext(flumeConfig, agentName, sourceName));
-
- if (source instanceof PollableSource) {
- source.setChannelProcessor(new ChannelProcessor(
- new NifiChannelSelector(pollableSourceChannel)));
- source.start();
- }
- } catch (Throwable th) {
- getLogger().error("Error creating source", th);
- throw Throwables.propagate(th);
- }
- }
-
- @OnStopped
- public void stopped() {
- if (source instanceof PollableSource) {
- source.stop();
- } else {
- EventDrivenSourceRunner runner = runnerRef.get();
- if (runner != null) {
- runner.stop();
- runnerRef.compareAndSet(runner, null);
- }
-
- NifiSessionFactoryChannel eventDrivenSourceChannel =
eventDrivenSourceChannelRef.get();
- if (eventDrivenSourceChannel != null) {
- eventDrivenSourceChannel.stop();
-
eventDrivenSourceChannelRef.compareAndSet(eventDrivenSourceChannel, null);
- }
- }
- sessionFactoryRef.set(null);
- }
-
- @Override
- public void onTrigger(ProcessContext context, ProcessSessionFactory
sessionFactory) throws ProcessException {
- if (source instanceof PollableSource) {
- super.onTrigger(context, sessionFactory);
- } else if (source instanceof EventDrivenSource) {
- ProcessSessionFactory old =
sessionFactoryRef.getAndSet(sessionFactory);
- if (old != sessionFactory) {
- if (runnerRef.get() != null) {
- stopped();
- sessionFactoryRef.set(sessionFactory);
- }
-
- runnerRef.set(new EventDrivenSourceRunner());
- eventDrivenSourceChannelRef.set(new
NifiSessionFactoryChannel(sessionFactoryRef.get(), SUCCESS));
- eventDrivenSourceChannelRef.get().start();
- source.setChannelProcessor(new ChannelProcessor(
- new
NifiChannelSelector(eventDrivenSourceChannelRef.get())));
- runnerRef.get().setSource(source);
- runnerRef.get().start();
- }
- }
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- if (source instanceof PollableSource) {
- PollableSource pollableSource = (PollableSource) source;
- try {
- pollableSourceChannel.setSession(session);
- pollableSource.process();
- } catch (EventDeliveryException ex) {
- throw new ProcessException("Error processing pollable source",
ex);
- }
- } else {
- throw new ProcessException("Invalid source type: " +
source.getClass().getSimpleName());
- }
- }
-}
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java
deleted file mode 100644
index 2b0ba77616..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.flume;
-
-import com.google.common.collect.ImmutableList;
-import java.util.List;
-import org.apache.flume.Channel;
-import org.apache.flume.ChannelSelector;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-
-
-public class NifiChannelSelector implements ChannelSelector {
- private String name;
- private final List<Channel> requiredChannels;
- private final List<Channel> optionalChannels;
-
- public NifiChannelSelector(Channel channel) {
- requiredChannels = ImmutableList.of(channel);
- optionalChannels = ImmutableList.of();
- }
-
- @Override
- public List<Channel> getRequiredChannels(Event event) {
- return requiredChannels;
- }
-
- @Override
- public List<Channel> getOptionalChannels(Event event) {
- return optionalChannels;
- }
-
- @Override
- public List<Channel> getAllChannels() {
- return requiredChannels;
- }
-
- @Override
- public void setChannels(List<Channel> channels) {
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public void setName(String name) {
- this.name = name;
- }
-
- @Override
- public void configure(Context context) {
- }
-}
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSessionChannel.java
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSessionChannel.java
deleted file mode 100644
index 4c111af6b8..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSessionChannel.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.flume;
-
-import org.apache.flume.Context;
-import org.apache.flume.channel.BasicChannelSemantics;
-import org.apache.flume.channel.BasicTransactionSemantics;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-
-public class NifiSessionChannel extends BasicChannelSemantics {
-
- private ProcessSession session;
- private final Relationship relationship;
-
- public NifiSessionChannel(Relationship relationship) {
- this.relationship = relationship;
- }
-
- public void setSession(ProcessSession session) {
- this.session = session;
- }
-
- @Override
- protected BasicTransactionSemantics createTransaction() {
- return new NifiTransaction(session, relationship);
- }
-
- @Override
- public void configure(Context context) {
- }
-
-}
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSessionFactoryChannel.java
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSessionFactoryChannel.java
deleted file mode 100644
index eb31a66420..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSessionFactoryChannel.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.flume;
-
-import org.apache.flume.ChannelFullException;
-import org.apache.flume.Context;
-import org.apache.flume.channel.BasicChannelSemantics;
-import org.apache.flume.channel.BasicTransactionSemantics;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.Relationship;
-
-public class NifiSessionFactoryChannel extends BasicChannelSemantics {
-
- private final ProcessSessionFactory sessionFactory;
- private final Relationship relationship;
-
- public NifiSessionFactoryChannel(ProcessSessionFactory sessionFactory,
Relationship relationship) {
- this.sessionFactory = sessionFactory;
- this.relationship = relationship;
- }
-
- @Override
- protected BasicTransactionSemantics createTransaction() {
- LifecycleState lifecycleState = getLifecycleState();
- if (lifecycleState == LifecycleState.STOP) {
- throw new ChannelFullException("Can't write to a stopped channel");
- }
- return new NifiTransaction(sessionFactory.createSession(),
relationship);
- }
-
- @Override
- public void configure(Context context) {
- }
-
-}
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSinkSessionChannel.java
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSinkSessionChannel.java
deleted file mode 100644
index 5621b6dd1d..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSinkSessionChannel.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.flume;
-
-import org.apache.flume.Context;
-import org.apache.flume.channel.BasicChannelSemantics;
-import org.apache.flume.channel.BasicTransactionSemantics;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-
-public class NifiSinkSessionChannel extends BasicChannelSemantics {
-
- private ProcessSession session;
- private final Relationship success;
- private final Relationship failure;
-
- public NifiSinkSessionChannel(Relationship success, Relationship failure) {
- this.success = success;
- this.failure = failure;
- }
-
- public void setSession(ProcessSession session) {
- this.session = session;
- }
-
- @Override
- protected BasicTransactionSemantics createTransaction() {
- return new NifiSinkTransaction(session, success, failure);
- }
-
- @Override
- public void configure(Context context) {
- }
-
-}
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSinkTransaction.java
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSinkTransaction.java
deleted file mode 100644
index 9fef79710e..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSinkTransaction.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.flume;
-
-import org.apache.flume.Event;
-import org.apache.flume.channel.BasicTransactionSemantics;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processors.flume.util.FlowFileEvent;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-class NifiSinkTransaction extends BasicTransactionSemantics {
- private final ProcessSession session;
- private final Relationship success;
- private final Relationship failure;
- private final List<FlowFile> flowFiles;
-
- public NifiSinkTransaction(ProcessSession session, Relationship success,
Relationship failure) {
- this.session = session;
- this.success = success;
- this.failure = failure;
- this.flowFiles = new ArrayList<>();
- }
-
- @Override
- protected void doPut(Event event) throws InterruptedException {
- AbstractFlumeProcessor.transferEvent(event, session, success);
- }
-
- @Override
- protected Event doTake() throws InterruptedException {
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return null;
- }
- flowFiles.add(flowFile);
-
- return new FlowFileEvent(flowFile, session);
- }
-
- @Override
- protected void doCommit() throws InterruptedException {
- session.transfer(flowFiles, success);
- session.commitAsync();
- }
-
- @Override
- protected void doRollback() throws InterruptedException {
- session.transfer(flowFiles, failure);
- session.commitAsync();
- }
-
-
-}
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java
deleted file mode 100644
index 59b715c06f..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.flume;
-
-import org.apache.flume.Event;
-import org.apache.flume.channel.BasicTransactionSemantics;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-
-class NifiTransaction extends BasicTransactionSemantics {
-
- private final ProcessSession session;
- private final Relationship relationship;
-
- public NifiTransaction(ProcessSession session, Relationship relationship) {
- this.session = session;
- this.relationship = relationship;
- }
-
- @Override
- protected void doPut(Event event) throws InterruptedException {
- AbstractFlumeProcessor.transferEvent(event, session, relationship);
- }
-
- @Override
- protected Event doTake() throws InterruptedException {
- throw new UnsupportedOperationException("Only put supported");
- }
-
- @Override
- protected void doCommit() throws InterruptedException {
- session.commitAsync();
- }
-
- @Override
- protected void doRollback() throws InterruptedException {
- session.rollback();
- }
-
-}
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java
deleted file mode 100644
index c9ec3d113e..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.flume.util;
-
-import static
org.apache.nifi.processors.flume.util.FlowFileEventConstants.ENTRY_DATE_HEADER;
-import static
org.apache.nifi.processors.flume.util.FlowFileEventConstants.ID_HEADER;
-import static
org.apache.nifi.processors.flume.util.FlowFileEventConstants.LAST_QUEUE_DATE_HEADER;
-import static
org.apache.nifi.processors.flume.util.FlowFileEventConstants.LINEAGE_START_DATE_HEADER;
-import static
org.apache.nifi.processors.flume.util.FlowFileEventConstants.SIZE_HEADER;
-
-import com.google.common.collect.Maps;
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import java.util.Map;
-import org.apache.flume.Event;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.stream.io.StreamUtils;
-
-public class FlowFileEvent implements Event {
-
- private final FlowFile flowFile;
- private final ProcessSession session;
-
- private final Map<String, String> headers;
- private boolean headersLoaded;
-
- private final Object bodyLock;
- private byte[] body;
- private boolean bodyLoaded;
-
- public FlowFileEvent(FlowFile flowFile, ProcessSession session) {
- this.flowFile = flowFile;
- this.session = session;
-
- headers = Maps.newHashMap();
- bodyLock = new Object();
- bodyLoaded = false;
- }
-
- @Override
- public Map<String, String> getHeaders() {
- synchronized (headers) {
- if (!headersLoaded) {
- headers.putAll(flowFile.getAttributes());
- headers.put(ENTRY_DATE_HEADER, Long.toString(flowFile.getEntryDate()));
- headers.put(ID_HEADER, Long.toString(flowFile.getId()));
- headers.put(LAST_QUEUE_DATE_HEADER,
Long.toString(flowFile.getLastQueueDate()));
- headers.put(LINEAGE_START_DATE_HEADER,
Long.toString(flowFile.getLineageStartDate()));
- headers.put(SIZE_HEADER, Long.toString(flowFile.getSize()));
- headersLoaded = true;
- }
- }
- return headers;
- }
-
- @Override
- public void setHeaders(Map<String, String> headers) {
- synchronized (this.headers) {
- this.headers.clear();
- this.headers.putAll(headers);
- headersLoaded = true;
- }
- }
-
- @Override
- public byte[] getBody() {
- synchronized (bodyLock) {
- if (!bodyLoaded) {
- if (flowFile.getSize() > Integer.MAX_VALUE) {
- throw new RuntimeException("Can't get body of Event because the
backing FlowFile is too large (" + flowFile.getSize() + " bytes)");
- }
-
- final ByteArrayOutputStream baos = new ByteArrayOutputStream((int)
flowFile.getSize());
- session.read(flowFile, new InputStreamCallback() {
-
- @Override
- public void process(InputStream in) throws IOException {
- try (BufferedInputStream input = new BufferedInputStream(in)) {
- StreamUtils.copy(input, baos);
- }
- baos.close();
- }
- });
-
- body = baos.toByteArray();
- bodyLoaded = true;
- }
- }
-
- return body;
- }
-
- @Override
- public void setBody(byte[] body) {
- synchronized (bodyLock) {
- this.body = Arrays.copyOf(body, body.length);
- bodyLoaded = true;
- }
- }
-}
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java
deleted file mode 100644
index 2c0dd9cbab..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.flume.util;
-
-
-public class FlowFileEventConstants {
-
- // FlowFile#getEntryDate();
- public static final String ENTRY_DATE_HEADER = "nifi.entry.date";
-
- // FlowFile#getId();
- public static final String ID_HEADER = "nifi.id";
-
- // FlowFile#getLastQueueDate();
- public static final String LAST_QUEUE_DATE_HEADER = "nifi.last.queue.date";
-
- // FlowFile#getLineageStartDate();
- public static final String LINEAGE_START_DATE_HEADER =
"nifi.lineage.start.date";
-
- // FlowFile#getSize();
- public static final String SIZE_HEADER = "nifi.size";
-
-}
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
deleted file mode 100644
index f5b57e1099..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-org.apache.nifi.processors.flume.ExecuteFlumeSource
-org.apache.nifi.processors.flume.ExecuteFlumeSink
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/resources/docs/org.apache.nifi.processors.flume.ExecuteFlumeSink/additionalDetails.html
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/resources/docs/org.apache.nifi.processors.flume.ExecuteFlumeSink/additionalDetails.html
deleted file mode 100644
index 16ae4a7481..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/resources/docs/org.apache.nifi.processors.flume.ExecuteFlumeSink/additionalDetails.html
+++ /dev/null
@@ -1,155 +0,0 @@
-<!DOCTYPE html>
-<html lang="en">
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<head>
- <meta charset="utf-8" />
- <title>ExecuteFlumeSink</title>
- <link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css" />
-</head>
-
-<body>
-<h2>Data Model</h2>
-<p>
- This processor executes an Apache Flume sink. FlowFiles are wrapped in
Flume's
- Event interface. The content of the FlowFile becomes the body of the Event
and
- the attributes of the FlowFile become Event headers. The following special
- headers are also set:
-</p>
-<table id="headers">
- <tr>
- <th>Flume Event Header</th>
- <th>FlowFile Attribute</th>
- </tr>
- <tr>
- <td>nifi.entry.date</td>
- <td>FlowFile#getEntryDate()</td>
- </tr>
- <tr>
- <td>nifi.id</td>
- <td>FlowFile#getId()</td>
- </tr>
- <tr>
- <td>nifi.last.queue.date</td>
- <td>FlowFile#getLastQueueDate()</td>
- </tr>
- <tr>
- <td>nifi.lineage.start.date</td>
- <td>FlowFile#getLineageStartDate()</td>
- </tr>
- <tr>
- <td>nifi.size</td>
- <td>FlowFile#getSize()</td>
- </tr>
-</table>
-<h2>Warning</h2>
-<p>
- In NiFi, the contents of a FlowFile are accessed via a stream, but in
Flume it is
- stored in a byte array. This means the full content will be loaded into
memory when
- a FlowFile is processed by the ExecuteFlumeSink processor. You should
consider the
- typical size of the FlowFiles you'll process and the batch size, if any,
your sink
- is configured with when setting NiFi's heap size.
-</p>
-<h2>Configuration Details</h2>
-<p>
- This processor is designed to execute arbitrary Flume sinks. Most of the
details
- of configuring the sink is deferred to Flume's built-in configuration
system.
- For details on the available settings for each sink type, refer to the
Flume
- <a href="http://flume.apache.org/FlumeUserGuide.html#flume-sinks">User
Guide</a>.
- Configuring the Flume sink is a four step process:
-</p>
-<ol>
- <li>Set the Sink Type property to a valid Flume sink type.</li>
- <li>
- Set the Agent Name property to the name of the agent in your
- Flume configuration. This is the prefix of the properties in the Flume
- configuration file. Example: <code>tier1</code>
- </li>
- <li>
- Set the Sink Name property to the name of the sink in your Flume
- configuration. If Agent Name is <code>tier1</code>, then the Sink Name
- is the value of the <code>tier1.sinks</code> property. Example:
<code>sink-1</code>
- </li>
- <li>
- Copy and paste the configuration for the sink from your Flume
configuration
- file into the Flume Configuration property. Assuming you're using
- the same Agent Name and Sink Name as in the examples above, this will
be all
- of the properties that start with <code>tier1.sinks.sink-1</code>.
- Do not copy the <code>tier1.sinks.sink-1.type</code> or
- <code>tier1.sinks.sink-1.channel</code> properties.
- </li>
-</ol>
-<h2>Usage Example</h2>
-<p>
- Assuming you had the following existing Flume configuration file:
-</p>
- <pre>
-a1.sources = r1
-a1.sinks = k1
-a1.channels = c1
-
-a1.sources.r1.type = netcat
-a1.sources.r1.bind = localhost
-a1.sources.r1.port = 44444
-
-a1.sinks.k1.type = hdfs
-a1.sinks.k1.channel = c1
-a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
-a1.sinks.k1.hdfs.filePrefix = events-
-a1.sinks.k1.hdfs.round = true
-a1.sinks.k1.hdfs.roundValue = 10
-a1.sinks.k1.hdfs.roundUnit = minute
-
-a1.channels.c1.type = memory
-a1.channels.c1.capacity = 1000
-a1.channels.c1.transactionCapacity = 100
-
-a1.sources.r1.channels = c1
-a1.sinks.k1.channel = c1</pre>
-<p>
- Then you'd configure the ExecuteFlumeSink as follows:
-</p>
-<table id="example">
- <tr>
- <th>Property</th>
- <th>Value</th>
- </tr>
- <tr>
- <td>Sink Type</td>
- <td>hdfs</td>
- </tr>
- <tr>
- <td>Agent Name</td>
- <td>a1</td>
- </tr>
- <tr>
- <td>Sink Name</td>
- <td>k1</td>
- </tr>
- <tr>
- <td>Flume Configuration</td>
- <td>
- <code>
- a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S<br>
- a1.sinks.k1.hdfs.filePrefix = events-<br>
- a1.sinks.k1.hdfs.round = true<br>
- a1.sinks.k1.hdfs.roundValue = 10<br>
- a1.sinks.k1.hdfs.roundUnit = minute
- </code>
- </td>
- </tr>
-</table>
-</body>
-</html>
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/resources/docs/org.apache.nifi.processors.flume.ExecuteFlumeSource/additionalDetails.html
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/resources/docs/org.apache.nifi.processors.flume.ExecuteFlumeSource/additionalDetails.html
deleted file mode 100644
index bb4c1359e5..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/resources/docs/org.apache.nifi.processors.flume.ExecuteFlumeSource/additionalDetails.html
+++ /dev/null
@@ -1,114 +0,0 @@
-<!DOCTYPE html>
-<html lang="en">
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<head>
- <meta charset="utf-8" />
- <title>ExecuteFlumeSource</title>
- <link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css" />
-</head>
-
-<body>
-<h2>Data Model</h2>
-<p>
- This processor executes an Apache Flume source. Each Flume Event is turned
into
- a FlowFile. The content of the FlowFile is set to the body of the Event and
- the Event headers become FlowFile attributes.
-</p>
-<h2>Configuration Details</h2>
-<p>
- This processor is designed to execute arbitrary Flume sources. Most of the
details
- of configuring the source is deferred to Flume's built-in configuration
system.
- For details on the available settings for each source type, refer to the
Flume
- <a href="http://flume.apache.org/FlumeUserGuide.html#flume-sources">User
Guide</a>.
- Configuring the Flume source is a four step process:
-</p>
-<ol>
- <li>Set the Source Type property to a valid Flume source type.</li>
- <li>
- Set the Agent Name property to the name of the agent in your
- Flume configuration. This is the prefix of the properties in the Flume
- configuration file. Example: <code>tier1</code>
- </li>
- <li>
- Set the Source Name property to the name of the source in your Flume
- configuration. If Agent Name is <code>tier1</code>, then the Source
Name
- is the value of the <code>tier1.sources</code> property. Example:
<code>src-1</code>
- </li>
- <li>
- Copy and paste the configuration for the source from your Flume
configuration
- file into the Flume Configuration property. Assuming you're using
- the same Agent Name and Source Name as in the examples above, this
will be all
- of the properties that start with <code>tier1.sources.src-1</code>.
- Do not copy the <code>tier1.sources.src-1.type</code> or
- <code>tier1.sources.src-1.channel</code> properties.
- </li>
-</ol>
-<h2>Usage Example</h2>
-<p>
- Assuming you had the following existing Flume configuration file:
-</p>
- <pre>
-a1.sources = r1
-a1.sinks = k1
-a1.channels = c1
-
-a1.sources.r1.type = multiport_syslogtcp
-a1.sources.r1.channels = c1
-a1.sources.r1.host = 0.0.0.0
-a1.sources.r1.ports = 10001 10002 10003
-a1.sources.r1.portHeader = port
-
-a1.sinks.k1.type = logger
-
-a1.channels.c1.type = memory
-a1.channels.c1.capacity = 1000
-a1.channels.c1.transactionCapacity = 100
-
-a1.sources.r1.channels = c1
-a1.sinks.k1.channel = c1</pre>
-<p>
- Then you'd configure the ExecuteFlumeSource as follows:
-</p>
-<table id="example">
- <tr>
- <th>Property</th>
- <th>Value</th>
- </tr>
- <tr>
- <td>Source Type</td>
- <td>multiport_syslogtcp</td>
- </tr>
- <tr>
- <td>Agent Name</td>
- <td>a1</td>
- </tr>
- <tr>
- <td>Source Name</td>
- <td>r1</td>
- </tr>
- <tr>
- <td>Flume Configuration</td>
- <td>
- <code>
- a1.sources.r1.host = 0.0.0.0<br>
- a1.sources.r1.ports = 10001 10002 10003<br>
- a1.sources.r1.portHeader = port
- </code>
- </td>
- </tr>
-</table>
-</body>
-</html>
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/ExecuteFlumeSinkTest.java
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/ExecuteFlumeSinkTest.java
deleted file mode 100644
index 02d7766168..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/ExecuteFlumeSinkTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.flume;
-
-import org.apache.commons.io.filefilter.HiddenFileFilter;
-import org.apache.flume.sink.NullSink;
-import org.apache.flume.source.AvroSource;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.util.MockProcessContext;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.apache.nifi.util.file.FileUtils;
-import org.apache.nifi.util.security.MessageDigestUtils;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Path;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class ExecuteFlumeSinkTest {
-
- private static final Logger logger =
- LoggerFactory.getLogger(ExecuteFlumeSinkTest.class);
-
- @Test
- public void testValidators() {
- TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSink.class);
- Collection<ValidationResult> results;
- ProcessContext pc;
-
- results = new HashSet<>();
- runner.enqueue(new byte[0]);
- pc = runner.getProcessContext();
- if (pc instanceof MockProcessContext) {
- results = ((MockProcessContext) pc).validate();
- }
- assertEquals(1, results.size());
- for (ValidationResult vr : results) {
- logger.debug(vr.toString());
- assertTrue(vr.toString().contains("is invalid because Sink Type is
required"));
- }
-
- // non-existent class
- results = new HashSet<>();
- runner.setProperty(ExecuteFlumeSink.SINK_TYPE, "invalid.class.name");
- runner.enqueue(new byte[0]);
- pc = runner.getProcessContext();
- if (pc instanceof MockProcessContext) {
- results = ((MockProcessContext) pc).validate();
- }
- assertEquals(1, results.size());
- for (ValidationResult vr : results) {
- logger.debug(vr.toString());
- assertTrue(vr.toString().contains("is invalid because unable to
load sink"));
- }
-
- // class doesn't implement Sink
- results = new HashSet<>();
- runner.setProperty(ExecuteFlumeSink.SINK_TYPE,
AvroSource.class.getName());
- runner.enqueue(new byte[0]);
- pc = runner.getProcessContext();
- if (pc instanceof MockProcessContext) {
- results = ((MockProcessContext) pc).validate();
- }
- assertEquals(1, results.size());
- for (ValidationResult vr : results) {
- logger.debug(vr.toString());
- assertTrue(vr.toString().contains("is invalid because unable to
create sink"));
- }
-
- results = new HashSet<>();
- runner.setProperty(ExecuteFlumeSink.SINK_TYPE,
NullSink.class.getName());
- runner.enqueue(new byte[0]);
- pc = runner.getProcessContext();
- if (pc instanceof MockProcessContext) {
- results = ((MockProcessContext) pc).validate();
- }
- assertEquals(0, results.size());
- }
-
-
- @Test
- public void testNullSink() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSink.class);
- runner.setProperty(ExecuteFlumeSink.SINK_TYPE,
NullSink.class.getName());
- try (InputStream inputStream =
getClass().getResourceAsStream("/testdata/records.txt")) {
- Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
- runner.enqueue(inputStream, attributes);
- runner.run();
- }
- }
-
- @Test
- public void testBatchSize() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSink.class);
- runner.setProperty(ExecuteFlumeSink.SINK_TYPE,
NullSink.class.getName());
- runner.setProperty(ExecuteFlumeSink.FLUME_CONFIG,
- "tier1.sinks.sink-1.batchSize = 1000\n");
- for (int i = 0; i < 100000; i++) {
- runner.enqueue(String.valueOf(i).getBytes());
- }
- runner.run(100);
- }
-
- @Test
- @Disabled("Does not work on Windows")
- public void testHdfsSink(@TempDir Path temp) throws IOException {
- File destDir = temp.resolve("hdfs").toFile();
- destDir.mkdirs();
-
- TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSink.class);
- runner.setProperty(ExecuteFlumeSink.SINK_TYPE, "hdfs");
- runner.setProperty(ExecuteFlumeSink.FLUME_CONFIG,
- "tier1.sinks.sink-1.hdfs.path = " + destDir.toURI().toString() +
"\n" +
- "tier1.sinks.sink-1.hdfs.fileType = DataStream\n" +
- "tier1.sinks.sink-1.hdfs.serializer = TEXT\n" +
- "tier1.sinks.sink-1.serializer.appendNewline = false"
- );
- try (InputStream inputStream =
getClass().getResourceAsStream("/testdata/records.txt")) {
- Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
- runner.enqueue(inputStream, attributes);
- runner.run();
- }
-
- File[] files =
destDir.listFiles((FilenameFilter)HiddenFileFilter.VISIBLE);
- assertEquals(1, files.length, "Unexpected number of destination
files.");
- File dst = files[0];
- byte[] expectedDigest;
- try (InputStream resourceStream =
getClass().getResourceAsStream("/testdata/records.txt")) {
- expectedDigest = MessageDigestUtils.getDigest(resourceStream);
- }
- byte[] actualDigest = FileUtils.computeDigest(dst);
- assertArrayEquals(expectedDigest, actualDigest, "Destination file
doesn't match source data");
- }
-
-}
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/ExecuteFlumeSourceTest.java
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/ExecuteFlumeSourceTest.java
deleted file mode 100644
index 7c21fb5580..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/ExecuteFlumeSourceTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.flume;
-
-
-import org.apache.flume.sink.NullSink;
-import org.apache.flume.source.AvroSource;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.MockProcessContext;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.apache.nifi.util.file.FileUtils;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class ExecuteFlumeSourceTest {
-
- private static final Logger logger =
LoggerFactory.getLogger(ExecuteFlumeSourceTest.class);
-
- @Test
- public void testValidators() {
- TestRunner runner =
TestRunners.newTestRunner(ExecuteFlumeSource.class);
- Collection<ValidationResult> results;
- ProcessContext pc;
-
- results = new HashSet<>();
- runner.enqueue(new byte[0]);
- pc = runner.getProcessContext();
- if (pc instanceof MockProcessContext) {
- results = ((MockProcessContext) pc).validate();
- }
- assertEquals(1, results.size());
- for (ValidationResult vr : results) {
- logger.debug(vr.toString());
- assertTrue(vr.toString().contains("is invalid because Source Type
is required"));
- }
-
- // non-existent class
- results = new HashSet<>();
- runner.setProperty(ExecuteFlumeSource.SOURCE_TYPE,
"invalid.class.name");
- runner.enqueue(new byte[0]);
- pc = runner.getProcessContext();
- if (pc instanceof MockProcessContext) {
- results = ((MockProcessContext) pc).validate();
- }
- assertEquals(1, results.size());
- for (ValidationResult vr : results) {
- logger.debug(vr.toString());
- assertTrue(vr.toString().contains("is invalid because unable to
load source"));
- }
-
- // class doesn't implement Source
- results = new HashSet<>();
- runner.setProperty(ExecuteFlumeSource.SOURCE_TYPE,
NullSink.class.getName());
- runner.enqueue(new byte[0]);
- pc = runner.getProcessContext();
- if (pc instanceof MockProcessContext) {
- results = ((MockProcessContext) pc).validate();
- }
- assertEquals(1, results.size());
- for (ValidationResult vr : results) {
- logger.debug(vr.toString());
- assertTrue(vr.toString().contains("is invalid because unable to
create source"));
- }
-
- results = new HashSet<>();
- runner.setProperty(ExecuteFlumeSource.SOURCE_TYPE,
AvroSource.class.getName());
- runner.enqueue(new byte[0]);
- pc = runner.getProcessContext();
- if (pc instanceof MockProcessContext) {
- results = ((MockProcessContext) pc).validate();
- }
- assertEquals(0, results.size());
- }
-
- @Test
- public void testSequenceSource() {
- TestRunner runner =
TestRunners.newTestRunner(ExecuteFlumeSource.class);
- runner.setProperty(ExecuteFlumeSource.SOURCE_TYPE, "seq");
- runner.run();
- List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ExecuteFlumeSource.SUCCESS);
- assertEquals(1, flowFiles.size());
- for (MockFlowFile flowFile : flowFiles) {
- logger.debug(flowFile.toString());
- assertEquals(1, flowFile.getSize());
- }
- }
-
- @Test
- @Disabled("Does not work on Windows")
- public void testSourceWithConfig(@TempDir Path temp) throws IOException {
- File spoolDirectory = temp.resolve("spooldir").toFile();
- spoolDirectory.mkdirs();
- File dst = new File(spoolDirectory, "records.txt");
-
FileUtils.copyFile(getClass().getResourceAsStream("/testdata/records.txt"),
dst, true, false);
-
- TestRunner runner =
TestRunners.newTestRunner(ExecuteFlumeSource.class);
- runner.setProperty(ExecuteFlumeSource.SOURCE_TYPE, "spooldir");
- runner.setProperty(ExecuteFlumeSink.FLUME_CONFIG,
- "tier1.sources.src-1.spoolDir = " +
spoolDirectory.getAbsolutePath());
- runner.run(1, false, true);
- // Because the spool directory source is an event driven source, it
may take some time for flow files to get
- // produced. I'm willing to wait up to 5 seconds, but will bail out
early if possible. If it takes longer than
- // that then there is likely a bug.
- int numWaits = 10;
- while
(runner.getFlowFilesForRelationship(ExecuteFlumeSource.SUCCESS).size() < 4 &&
--numWaits > 0) {
- try {
- TimeUnit.MILLISECONDS.sleep(500);
- } catch (InterruptedException ex) {
- logger.warn("Sleep interrupted");
- }
- }
- runner.shutdown();
- runner.assertTransferCount(ExecuteFlumeSource.SUCCESS, 4);
- int i = 1;
- for (MockFlowFile flowFile :
runner.getFlowFilesForRelationship(ExecuteFlumeSource.SUCCESS)) {
- flowFile.assertContentEquals("record " + i);
- i++;
- }
- }
-}
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site-broken.xml
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site-broken.xml
deleted file mode 100644
index e06a193353..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site-broken.xml
+++ /dev/null
@@ -1,25 +0,0 @@
-<?xml version="1.0"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!-- Put site-specific property overrides in this file. -->
-
-<configuration>
- <property>
- <name>fs.default.name</name>
- <value>hdfs://localhost:65535</value>
- </property>
-</configuration>
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml
deleted file mode 100644
index 849854b77a..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<?xml version="1.0"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!-- Put site-specific property overrides in this file. -->
-
-<configuration>
- <property>
- <name>fs.defaultFS</name>
- <!--
- Hadoop doesn't support a chroot style operation for the
- local filesystem so there's no benefit to setting this
- to a directory other than '/'
- -->
- <value>file:///</value>
- </property>
-</configuration>
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/simplelogger.properties
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/simplelogger.properties
deleted file mode 100644
index e3d4fc1bdf..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/simplelogger.properties
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-org.slf4j.simpleLogger.logFile=System.out
-org.slf4j.simpleLogger.defaultLogLevel=info
-org.slf4j.simpleLogger.showDateTime=true
-org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss.SSS
-org.slf4j.simpleLogger.levelInBrackets=true
-org.slf4j.simpleLogger.log.org.apache.nifi.processors.flume=debug
-org.slf4j.simpleLogger.log.org.apache.flume=debug
diff --git
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/testdata/records.txt
b/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/testdata/records.txt
deleted file mode 100644
index 5a809eee88..0000000000
---
a/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/testdata/records.txt
+++ /dev/null
@@ -1,4 +0,0 @@
-record 1
-record 2
-record 3
-record 4
diff --git a/nifi-nar-bundles/nifi-flume-bundle/pom.xml
b/nifi-nar-bundles/nifi-flume-bundle/pom.xml
deleted file mode 100644
index 86199999b3..0000000000
--- a/nifi-nar-bundles/nifi-flume-bundle/pom.xml
+++ /dev/null
@@ -1,51 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-nar-bundles</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </parent>
- <artifactId>nifi-flume-bundle</artifactId>
- <packaging>pom</packaging>
- <description>A bundle of processors that run Flume
sources/sinks</description>
- <modules>
- <module>nifi-flume-processors</module>
- <module>nifi-flume-nar</module>
- </modules>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-flume-processors</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </dependency>
- <!-- Override Netty 3 -->
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- <version>${netty.3.version}</version>
- </dependency>
- <!-- Override Tomcat Embed Core 8.5.46 from Flume 1.10.0 -->
- <dependency>
- <groupId>org.apache.tomcat.embed</groupId>
- <artifactId>tomcat-embed-core</artifactId>
- <version>8.5.85</version>
- </dependency>
- </dependencies>
- </dependencyManagement>
-</project>
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 68bfaee164..a76d1aac05 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -45,7 +45,6 @@
<module>nifi-ccda-bundle</module>
<module>nifi-language-translation-bundle</module>
<module>nifi-mongodb-bundle</module>
- <module>nifi-flume-bundle</module>
<module>nifi-hbase-bundle</module>
<module>nifi-ambari-bundle</module>
<module>nifi-asana-bundle</module>