[
https://issues.apache.org/jira/browse/NIFI-981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15250518#comment-15250518
]
ASF GitHub Bot commented on NIFI-981:
-------------------------------------
Github user olegz commented on a diff in the pull request:
https://github.com/apache/nifi/pull/372#discussion_r60468215
--- Diff:
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java
---
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.hive.HiveDBCPService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.nio.charset.Charset;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLNonTransientException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@SeeAlso(ExecuteHiveQL.class)
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"sql", "hive", "put", "database", "update", "insert"})
+@CapabilityDescription("Executes a HiveQL DDL/DML command (UPDATE, INSERT,
e.g.). The content of an incoming FlowFile is expected to be the HiveQL command
"
+ + "to execute. The HiveQL command may use the ? to escape
parameters. In this case, the parameters to use must exist as FlowFile
attributes "
+ + "with the naming convention hiveql.args.N.type and
hiveql.args.N.value, where N is a positive integer. The hiveql.args.N.type is
expected to be "
+ + "a number indicating the JDBC Type. The content of the FlowFile
is expected to be in UTF-8 format.")
+@ReadsAttributes({
+ @ReadsAttribute(attribute = "hiveql.args.N.type", description =
"Incoming FlowFiles are expected to be parameterized HiveQL statements. The
type of each Parameter is specified as an integer "
+ + "that represents the JDBC Type of the parameter."),
+ @ReadsAttribute(attribute = "hiveql.args.N.value", description =
"Incoming FlowFiles are expected to be parameterized HiveQL statements. The
value of the Parameters are specified as "
+ + "hiveql.args.1.value, hiveql.args.2.value,
hiveql.args.3.value, and so on. The type of the hiveql.args.1.value Parameter
is specified by the hiveql.args.1.type attribute.")
+})
+public class PutHiveQL extends AbstractHiveQLProcessor {
+
+ public static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("Batch Size")
+ .description("The preferred number of FlowFiles to put to the
database in a single transaction")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("100")
+ .build();
+
+ public static final PropertyDescriptor CHARSET = new
PropertyDescriptor.Builder()
+ .name("Character Set")
+ .description("Specifies the character set of the record data.")
+ .required(true)
+ .defaultValue("UTF-8")
+ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("A FlowFile is routed to this relationship after
the database is successfully updated")
+ .build();
+ public static final Relationship REL_RETRY = new Relationship.Builder()
+ .name("retry")
+ .description("A FlowFile is routed to this relationship if the
database cannot be updated but attempting the operation again may succeed")
+ .build();
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("A FlowFile is routed to this relationship if the
database cannot be updated and retrying the operation will also fail, "
+ + "such as an invalid query or an integrity constraint
violation")
+ .build();
+
+ private static final Pattern HIVEQL_TYPE_ATTRIBUTE_PATTERN =
Pattern.compile("hiveql\\.args\\.(\\d+)\\.type");
+ private static final Pattern NUMBER_PATTERN =
Pattern.compile("-?\\d+");
+
+ private final static List<PropertyDescriptor> propertyDescriptors;
+ private final static Set<Relationship> relationships;
+
+ /*
+ * Will ensure that the list of property descriptors is build only
once.
+ * Will also create a Set of relationships
+ */
+ static {
+ List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+ _propertyDescriptors.add(HIVE_DBCP_SERVICE);
+ _propertyDescriptors.add(BATCH_SIZE);
+ _propertyDescriptors.add(CHARSET);
+ propertyDescriptors =
Collections.unmodifiableList(_propertyDescriptors);
+
+ Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_SUCCESS);
+ _relationships.add(REL_FAILURE);
+ _relationships.add(REL_RETRY);
+ relationships = Collections.unmodifiableSet(_relationships);
+ }
--- End diff --
This looks familiar. Consider making it cocnsistent across
> Add support for Hive JDBC / ExecuteSQL
> --------------------------------------
>
> Key: NIFI-981
> URL: https://issues.apache.org/jira/browse/NIFI-981
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Joseph Witt
> Assignee: Matt Burgess
>
> In this mailing list thread from September 2015 "NIFI DBCP connection pool
> not working for hive" the main thrust of the converstation is to provide
> proper support for delivering data to hive. Hive's jdbc driver appears to
> have dependencies on Hadoop libraries. We need to be careful/thoughtful
> about how to best support this so that different versions of Hadoop distros
> can be supported (potentially in parallel on the same flow).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)