[
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15418453#comment-15418453
]
ASF GitHub Bot commented on FLINK-2055:
---------------------------------------
Github user ramkrish86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/2332#discussion_r74549599
--- Diff:
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class represents a list of {@link MutationAction}s you will take
when writing
+ * an input value of {@link HBaseSink} to a row in a HBase table.
+ * Each {@link MutationAction} can create an HBase {@link Mutation}
operation type
+ * including {@link Put}, {@link Increment}, {@link Append} and {@link
Delete}.
+ */
+public class MutationActionList {
+ private final List<MutationAction> actions;
+
+ public MutationActionList() {
+ this.actions = new ArrayList<>();
+ }
+
+ public List<MutationAction> getActions() {
+ return this.actions;
+ }
+
+ /**
+ * Create a new list of HBase {@link Mutation}s.
+ *
+ * @param rowKey row that the created {@link Mutation} list is applied
to
+ * @param writeToWAL enable WAL
+ * @return a list of HBase {@link Mutation}s
+ */
+ public List<Mutation> newMutationList(byte[] rowKey, boolean
writeToWAL) {
+ List<Mutation> mutations = new ArrayList<>();
+ Put put = null;
+ Increment increment = null;
+ Append append = null;
+ Delete delete = null;
+ boolean rowIsDeleted = false;
+ for (MutationAction action : actions) {
+ switch (action.getType()) {
+ case PUT:
+ if (put == null) {
+ put = new Put(rowKey);
+ mutations.add(put);
+ }
+ if (action.getTs() == -1) {
+
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
+ } else {
+
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(),
action.getValue());
+ }
+ break;
+
+ case INCREMENT:
+ if (increment == null) {
+ increment = new
Increment(rowKey);
+ mutations.add(increment);
+ }
+ increment.addColumn(action.getFamily(),
action.getQualifier(), action.getIncrement());
+ break;
+
+ case APPEND:
+ if (append == null) {
+ append = new Append(rowKey);
+ mutations.add(append);
+ }
+ append.add(action.getFamily(),
action.getQualifier(), action.getValue());
+ break;
+
+ // If there are multiple DELETE_ROW actions,
only the first one is served
+ case DELETE_ROW:
+ if (!rowIsDeleted) {
+ for (int i = 0; i <
mutations.size(); ) {
+ if (mutations.get(i)
instanceof Delete) {
+
mutations.remove(i);
+ } else {
+ i++;
+ }
+ }
+ delete = new Delete(rowKey,
action.getTs());
+ mutations.add(delete);
+ rowIsDeleted = true;
+ }
+ break;
+
+ case DELETE_FAMILY:
+ if (!rowIsDeleted) {
+ if (delete == null) {
+ delete = new
Delete(rowKey);
+ mutations.add(delete);
+ }
+
delete.addFamily(action.getFamily(), action.getTs());
+ }
+ break;
+
+ case DELETE_COLUMNS:
+ if (!rowIsDeleted) {
+ if (delete == null) {
+ delete = new
Delete(rowKey);
+ mutations.add(delete);
+ }
+
delete.addColumns(action.getFamily(), action.getQualifier(), action.getTs());
--- End diff --
If action.getTs() == -1, call delete.addColumns(byte[] fam, byte[] qual) so
that all versions of a specified column can be deleted.
> Implement Streaming HBaseSink
> -----------------------------
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
> Issue Type: New Feature
> Components: Streaming, Streaming Connectors
> Affects Versions: 0.9
> Reporter: Robert Metzger
> Assignee: Hilmi Yildirim
>
> As per :
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)