[ 
https://issues.apache.org/jira/browse/BAHIR-99?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16097663#comment-16097663
 ] 

ASF GitHub Bot commented on BAHIR-99:
-------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/17#discussion_r128921944
  
    --- Diff: 
flink-connector-kudu/src/main/java/es/accenture/flink/Utils/Utils.java ---
    @@ -0,0 +1,447 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package es.accenture.flink.Utils;
    +
    +import es.accenture.flink.Sink.KuduOutputFormat;
    +import es.accenture.flink.Utils.Exceptions.KuduClientException;
    +import es.accenture.flink.Utils.Exceptions.KuduTableException;
    +import org.apache.kudu.ColumnSchema;
    +import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
    +import org.apache.kudu.Schema;
    +import org.apache.kudu.Type;
    +import org.apache.kudu.client.*;
    +import org.apache.log4j.Logger;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +public class Utils {
    +
    +    //Kudu variables
    +    private KuduClient client;
    +    private KuduSession session;
    +
    +    // LOG4J
    +    private final static Logger logger = Logger.getLogger(Utils.class);
    +
    +    /**
    +     * Builder Util Class which creates a Kudu client and log in to be 
able to perform operations later
    +     * @param host Kudu's host
    +     * @throws KuduClientException In case of exception caused by Kudu 
Client
    +     */
    +    public Utils(String host) throws KuduClientException {
    +        this.client = new KuduClient.KuduClientBuilder(host).build();
    +        if (client == null){
    +            throw new KuduClientException("ERROR: param \"host\" not 
valid, can't establish connection");
    +        }
    +        this.session = this.client.newSession();
    +    }
    +
    +    /**
    +     * Return an instance of the table indicated in the settings
    +     *
    +     * In case that the table exists, return an instance of the table
    +     * In case that the table doesn't exist, create a new table with the 
data provided and return an instance
    +     * In both cases,takes into account the way of the table to perfom 
some operations or others
    +     *
    +     *     If the mode is CREATE:
    +     *
    +     *         If the table exists: return error (Can not create table 
that already exists)
    +     *         If the table doesn't exist and  the list of column names 
has not been provided: return error
    +     *         If the table doesn't exist and  the list of column names 
has been provided: create a new table with data provided and return an instance
    +     *
    +     *    If the mode is APPEND:
    +     *
    +     *        If the table exists: return the instance in the table
    +     *        If the table doesn't exist: return error
    +     *
    +     *    If the mode is OVERRIDE:
    +     *
    +     *        If the table exist: delete all rows of this table and return 
an instance of it
    +     *        If the table doesn't exist: return error
    +     *
    +     *
    +     * @param tableName             Table name to use
    +     * @param tableMode             Operations mode for operate with the 
table (CREATE, APPEND, OVERRIDE)
    +     * @return                      Instance of the table indicated
    +     * @throws KuduTableException   In case of can't access to a table o 
can't create it (wrong params or not existing table)
    +     * @throws KuduException        In case of error of Kudu
    +     */
    +    public KuduTable useTable(String tableName, Integer tableMode) throws 
KuduTableException, KuduException {
    +        KuduTable table;
    +
    +        if (tableMode == KuduOutputFormat.CREATE) {
    +            logger.error("Bad call method, use useTable(String tableName, 
String [] fieldsNames, RowSerializable row) instead");
    +            table = null;
    +        }else if (tableMode == KuduOutputFormat.APPEND) {
    +            logger.info("Modo APPEND");
    +            try {
    +                if (client.tableExists(tableName)) {
    +                    //logger.info("SUCCESS: There is the table with the 
name \"" + tableName + "\"");
    +                    table = client.openTable(tableName);
    +                } else {
    +                    logger.error("ERROR: The table doesn't exist");
    +                    throw new KuduTableException("ERROR: The table doesn't 
exist, so can't do APPEND operation");
    +                }
    +            } catch (Exception e) {
    +                throw new KuduTableException("ERROR: param \"host\" not 
valid, can't establish connection");
    +            }
    +        }else if (tableMode == KuduOutputFormat.OVERRIDE) {
    +            logger.info("Modo OVERRIDE");
    +            try {
    +                if (client.tableExists(tableName)) {
    +                    logger.info("SUCCESS: There is the table with the name 
\"" + tableName + "\". Emptying the table");
    +                    clearTable(tableName);
    +                    table = client.openTable(tableName);
    +                } else {
    +                    logger.error("ERROR: The table doesn't exist");
    +                    throw new KuduTableException("ERROR: The table doesn't 
exist, so can't do OVERRIDE operation");
    +                }
    +            } catch (Exception e) {
    +                throw new KuduTableException("ERROR: param \"host\" not 
valid, can't establish connection");
    +            }
    +        }else {
    +            throw new KuduTableException("ERROR: Incorrect parameters, 
please check the constructor method. Incorrect \"tableMode\" parameter.");
    +        }
    +        return table;
    +    }
    +
    +    /**
    +     * Returns an instance of the table requested in parameters
    +     * If the table exists, returns an instance of the table
    +     * If the table doesn't exist, creates a new table with the data 
provided and returns an instance
    +     *
    +     * @param tableName     Table name to use
    +     * @param fieldsNames   List of names of columns of the table (to 
create table)
    +     * @param row           List of values to insert a row in the table 
(to know the types of columns)
    +     * @return              Instance of the table indicated
    +     * @throws IllegalArgumentException In case of wrong parameters
    +     * @throws KuduException    In case of exception caused by Kudu
    +     */
    +    public KuduTable useTable(String tableName, String [] fieldsNames, 
RowSerializable row) throws IllegalArgumentException, KuduException {
    +        KuduTable table;
    +
    +        if (client.tableExists(tableName)){
    +            logger.info("The table exists");
    +            table = client.openTable(tableName);
    +        } else {
    +            if (tableName == null || tableName.equals("")) {
    +                throw new IllegalArgumentException("ERROR: Incorrect 
parameters, please check the constructor method. Incorrect \"tableName\" 
parameter.");
    +
    +            } else if (fieldsNames == null || fieldsNames[0].isEmpty()) {
    +                throw new IllegalArgumentException("ERROR: Incorrect 
parameters, please check the constructor method. Missing \"fields\" 
parameter.");
    +
    +            } else if (row == null){
    +                throw new IllegalArgumentException("ERROR: Incorrect 
parameters, please check the constructor method. Incorrect \"row\" parameter.");
    +
    +            } else {
    +                logger.info("The table doesn't exist");
    +                table = createTable(tableName, fieldsNames, row);
    +            }
    +        }
    +        return table;
    +    }
    +    /**
    +     * Create a new Kudu table and return the instance of this table
    +     *
    +     * @param tableName     name of the table to create
    +     * @param fieldsNames   list name columns of the table
    +     * @param row           list of values to insert a row in the table( 
to know the types of columns)
    +     * @return              instance of the table indicated
    +     * @throws KuduException In case of exception caused by Kudu
    +     */
    +    public KuduTable createTable (String tableName, String [] fieldsNames, 
RowSerializable row) throws KuduException {
    +
    +        if(client.tableExists(tableName))
    +            return client.openTable(tableName);
    +
    +
    +        List<ColumnSchema> columns = new ArrayList<ColumnSchema>();
    +        List<String> rangeKeys = new ArrayList<String>(); // Primary key
    +        rangeKeys.add(fieldsNames[0]);
    +
    +        logger.info("Creating the table \"" + tableName + "\"...");
    +        for (int i = 0; i < fieldsNames.length; i++){
    +            ColumnSchema col;
    +            String colName = fieldsNames[i];
    +            Type colType = getRowsPositionType(i, row);
    +
    +            if (colName.equals(fieldsNames[0])) {
    +                col = new ColumnSchemaBuilder(colName, 
colType).key(true).build();
    +                columns.add(0, col);//To create the table, the key must be 
the first in the column list otherwise it will give a failure
    +            } else {
    +                col = new ColumnSchemaBuilder(colName, colType).build();
    +                columns.add(col);
    +            }
    +        }
    +        Schema schema = new Schema(columns);
    +
    +        if(!client.tableExists(tableName))
    +            client.createTable(tableName, schema, new 
CreateTableOptions().setRangePartitionColumns(rangeKeys).addHashPartitions(rangeKeys,
 4));
    +        //logger.info("SUCCESS: The table has been created successfully");
    +
    +
    +        return client.openTable(tableName);
    +    }
    +    /**
    +     * Delete the indicated table
    +     *
    +     * @param tableName name table to delete
    +     */
    +    public void deleteTable (String tableName){
    +
    +        logger.info("Deleting the table \"" + tableName + "\"...");
    +        try {
    +            if(client.tableExists(tableName)) {
    +                client.deleteTable(tableName);
    +                logger.info("SUCCESS: Table deleted successfully");
    +            }
    +        } catch (KuduException e) {
    +            logger.error("The table \"" + tableName  +"\" doesn't exist, 
so can't be deleted.", e);
    +        }
    +    }
    +
    +    /**
    +     * Return the type of the value of the position "pos", like the class 
object "Type"
    +     *
    +     * @param pos   Row position
    +     * @param row   list of values to insert a row in the table
    +     * @return      element type "pos"-esimo of "row"
    +     */
    +    public Type getRowsPositionType (int pos, RowSerializable row){
    +        Type colType = null;
    +        switch(row.productElement(pos).getClass().getName()){
    +            case "java.lang.String":
    +                colType = Type.STRING;
    +                break;
    +            case "java.lang.Integer":
    +                colType = Type.INT32;
    +                break;
    +            case "java.lang.Boolean":
    +                colType = Type.BOOL;
    +                break;
    +            default:
    +                break;
    +        }
    +        return colType;
    +    }
    +
    +    /**
    +     * Return a list with all rows of the indicated table
    +     *
    +     * @param tableName Table name to read
    +     * @return          List of rows in the table(object Row)
    +     * @throws KuduException In case of exception caused by Kudu
    +     */
    +    public List<RowSerializable> readTable (String tableName) throws 
KuduException {
    +
    +        KuduTable table = client.openTable(tableName);
    +        KuduScanner scanner = client.newScannerBuilder(table).build();
    +        //Obtain the column name list
    +        String[] columnsNames = getNamesOfColumns(table);
    +        //The list return all rows
    +        List<RowSerializable> rowsList = new ArrayList<>();
    +
    +        int posRow = 0;
    +        while (scanner.hasMoreRows()) {
    +            for (RowResult row : scanner.nextRows()) { //Get the rows
    +                RowSerializable rowToInsert = new 
RowSerializable(columnsNames.length);
    +                for (String col : columnsNames) { //For each column, it's 
type determined and this is how to read it
    +
    +                    String colType = row.getColumnType(col).getName();
    +                    switch (colType) {
    +                        case "string":
    +                            rowToInsert.setField(posRow, 
row.getString(col));
    +                            posRow++;
    +                            break;
    +                        case "int32":
    +                            rowToInsert.setField(posRow, row.getInt(col));
    +                            posRow++;
    +                            break;
    +                        case "bool":
    +                            rowToInsert.setField(posRow, 
row.getBoolean(col));
    +                            posRow++;
    +                            break;
    +                        default:
    +                            break;
    +                    }
    +                }
    +                rowsList.add(rowToInsert);
    +                posRow = 0;
    +            }
    +        }
    +        return rowsList;
    +    }
    +
    +
    +
    +    /**
    +     * Return a list with all rows of the indicated table
    +     *
    +     * @param tableName Table name to read
    +     * @throws KuduException In case of exception caused by Kudu
    +     */
    +    public void readTablePrint (String tableName) throws KuduException {
    +        KuduTable table = client.openTable(tableName);
    +        KuduScanner scanner = client.newScannerBuilder(table).build();
    +        int cont = 0;
    +        try {
    +            while (scanner.hasMoreRows()) {
    +                RowResultIterator results = scanner.nextRows();
    +                while (results.hasNext()) {
    +                    RowResult result = results.next();
    +                    System.out.println(result.rowToString());
    +                    cont++;
    +                }
    +            }
    +            System.out.println("Number of rows: " + cont);
    +        } catch (Exception e) {
    --- End diff --
    
    The exception handling needs improvement


> Kudu connector to read/write from/to Kudu
> -----------------------------------------
>
>                 Key: BAHIR-99
>                 URL: https://issues.apache.org/jira/browse/BAHIR-99
>             Project: Bahir
>          Issue Type: New Feature
>          Components: Flink Streaming Connectors
>    Affects Versions: Flink-1.0
>            Reporter: Rubén Casado
>            Assignee: Rubén Casado
>             Fix For: Flink-Next
>
>
> Java library to integrate Apache Kudu and Apache Flink. Main goal is to be 
> able to read/write data from/to Kudu using the DataSet and DataStream Flink's 
> APIs.
> Data flows patterns:
> Batch
>  - Kudu -> DataSet<RowSerializable> -> Kudu
>  - Kudu -> DataSet<RowSerializable> -> other source
>  - Other source -> DataSet<RowSerializable> -> other source
> Stream
>  - Other source -> DataStream <RowSerializable> -> Kudu
> Code is available in https://github.com/rubencasado/Flink-Kudu



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to