[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15418453#comment-15418453
 ] 

ASF GitHub Bot commented on FLINK-2055:
---------------------------------------

Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2332#discussion_r74549599
  
    --- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/MutationActionList.java
 ---
    @@ -0,0 +1,371 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.hbase;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.apache.hadoop.hbase.client.Append;
    +import org.apache.hadoop.hbase.client.Delete;
    +import org.apache.hadoop.hbase.client.Durability;
    +import org.apache.hadoop.hbase.client.Increment;
    +import org.apache.hadoop.hbase.client.Mutation;
    +import org.apache.hadoop.hbase.client.Put;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + *  This class represents a list of {@link MutationAction}s you will take 
when writing
    + *  an input value of {@link HBaseSink} to a row in a HBase table.
    + *  Each {@link MutationAction} can create an HBase {@link Mutation} 
operation type
    + *  including {@link Put}, {@link Increment}, {@link Append} and {@link 
Delete}.
    + */
    +public class MutationActionList {
    +   private final List<MutationAction> actions;
    +
    +   public MutationActionList() {
    +           this.actions = new ArrayList<>();
    +   }
    +
    +   public List<MutationAction> getActions() {
    +           return this.actions;
    +   }
    +
    +   /**
    +    * Create a new list of HBase {@link Mutation}s.
    +    *
    +    * @param rowKey row that the created {@link Mutation} list is applied 
to
    +    * @param writeToWAL enable WAL
    +    * @return a list of HBase {@link Mutation}s
    +    */
    +   public List<Mutation> newMutationList(byte[] rowKey, boolean 
writeToWAL) {
    +           List<Mutation> mutations = new ArrayList<>();
    +           Put put = null;
    +           Increment increment = null;
    +           Append append = null;
    +           Delete delete = null;
    +           boolean rowIsDeleted = false;
    +           for (MutationAction action : actions) {
    +                   switch (action.getType()) {
    +                           case PUT:
    +                                   if (put == null) {
    +                                           put = new Put(rowKey);
    +                                           mutations.add(put);
    +                                   }
    +                                   if (action.getTs() == -1) {
    +                                           
put.addColumn(action.getFamily(), action.getQualifier(), action.getValue());
    +                                   } else {
    +                                           
put.addColumn(action.getFamily(), action.getQualifier(), action.getTs(), 
action.getValue());
    +                                   }
    +                                   break;
    +
    +                           case INCREMENT:
    +                                   if (increment == null) {
    +                                           increment = new 
Increment(rowKey);
    +                                           mutations.add(increment);
    +                                   }
    +                                   increment.addColumn(action.getFamily(), 
action.getQualifier(), action.getIncrement());
    +                                   break;
    +
    +                           case APPEND:
    +                                   if (append == null) {
    +                                           append = new Append(rowKey);
    +                                           mutations.add(append);
    +                                   }
    +                                   append.add(action.getFamily(), 
action.getQualifier(), action.getValue());
    +                                   break;
    +
    +                           // If there are multiple DELETE_ROW actions, 
only the first one is served
    +                           case DELETE_ROW:
    +                                   if (!rowIsDeleted) {
    +                                           for (int i = 0; i < 
mutations.size(); ) {
    +                                                   if (mutations.get(i) 
instanceof Delete) {
    +                                                           
mutations.remove(i);
    +                                                   } else {
    +                                                           i++;
    +                                                   }
    +                                           }
    +                                           delete = new Delete(rowKey, 
action.getTs());
    +                                           mutations.add(delete);
    +                                           rowIsDeleted = true;
    +                                   }
    +                                   break;
    +
    +                           case DELETE_FAMILY:
    +                                   if (!rowIsDeleted) {
    +                                           if (delete == null) {
    +                                                   delete = new 
Delete(rowKey);
    +                                                   mutations.add(delete);
    +                                           }
    +                                           
delete.addFamily(action.getFamily(), action.getTs());
    +                                   }
    +                                   break;
    +
    +                           case DELETE_COLUMNS:
    +                                   if (!rowIsDeleted) {
    +                                           if (delete == null) {
    +                                                   delete = new 
Delete(rowKey);
    +                                                   mutations.add(delete);
    +                                           }
    +                                           
delete.addColumns(action.getFamily(), action.getQualifier(), action.getTs());
    --- End diff --
    
    If action.getTs() == -1, call delete.addColumns(byte[] fam, byte[] qual) so 
that all versions of a specified column can be deleted.


> Implement Streaming HBaseSink
> -----------------------------
>
>                 Key: FLINK-2055
>                 URL: https://issues.apache.org/jira/browse/FLINK-2055
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming, Streaming Connectors
>    Affects Versions: 0.9
>            Reporter: Robert Metzger
>            Assignee: Hilmi Yildirim
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to