Repository: metamodel Updated Branches: refs/heads/master a868f71c0 -> 1d67d3086
Notice with the builders that I tried to follow the setup used in other projects, but this often didn't work for HBase's column-families. The FindQualifiersDriver works, but only from command line within a VM which runs Hadoop + HBase. It shows HBase working in combination with MapReduce. The class will be improved so it's method can be called by other classes. All other classes have been tested by connecting it to our DataCleaner. I'll make more Unit Tests next. hbase/pom.xml changes: With 'hbase-client' I needed to remove 2 exclusion. I've added 'hbase-server' dependency. Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/6ad602bb Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/6ad602bb Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/6ad602bb Branch: refs/heads/master Commit: 6ad602bb9af70ed7214b6d7e4f00cd1d9a037f71 Parents: fe02a59 Author: HUMANINFERENCE\g.dellemann <[email protected]> Authored: Fri Apr 20 16:36:27 2018 +0200 Committer: HUMANINFERENCE\g.dellemann <[email protected]> Committed: Fri Apr 20 16:36:27 2018 +0200 ---------------------------------------------------------------------- .../metamodel/AbstractUpdateCallback.java | 4 +- .../metamodel/data/AbstractRowBuilder.java | 33 ++++- .../insert/AbstractRowInsertionBuilder.java | 10 +- hbase/pom.xml | 111 +++++++++++++++ .../org/apache/metamodel/hbase/HBaseColumn.java | 50 +++++++ .../hbase/HBaseCreateTableBuilder.java | 71 ++++++++++ .../metamodel/hbase/HBaseDataContext.java | 44 +++--- .../apache/metamodel/hbase/HBaseFamilyMap.java | 5 +- .../org/apache/metamodel/hbase/HBaseRow.java | 6 +- .../hbase/HBaseRowInsertionBuilder.java | 76 ++++++++++ .../metamodel/hbase/HBaseTableDropBuilder.java | 37 +++++ .../metamodel/hbase/HBaseUpdateCallback.java | 134 ++++++++++++++++++ .../org/apache/metamodel/hbase/HBaseWriter.java | 138 +++++++++++++++++++ .../hbase/qualifiers/FindQualifiersDriver.java | 115 ++++++++++++++++ 14 files changed, 800 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metamodel/blob/6ad602bb/core/src/main/java/org/apache/metamodel/AbstractUpdateCallback.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/metamodel/AbstractUpdateCallback.java b/core/src/main/java/org/apache/metamodel/AbstractUpdateCallback.java index 4ff7279..b41b481 100644 --- a/core/src/main/java/org/apache/metamodel/AbstractUpdateCallback.java +++ b/core/src/main/java/org/apache/metamodel/AbstractUpdateCallback.java @@ -117,7 +117,7 @@ public abstract class AbstractUpdateCallback implements UpdateCallback { return update(getTable(tableName)); } - private Table getTable(String tableName) { + protected Table getTable(String tableName) { Table table = getDataContext().getTableByQualifiedLabel(tableName); if (table == null) { throw new IllegalArgumentException("No such table: " + tableName); @@ -159,7 +159,7 @@ public abstract class AbstractUpdateCallback implements UpdateCallback { UnsupportedOperationException { return new DeleteAndInsertBuilder(this, table); } - + public UpdateSummary getUpdateSummary() { return DefaultUpdateSummary.unknownUpdates(); } http://git-wip-us.apache.org/repos/asf/metamodel/blob/6ad602bb/core/src/main/java/org/apache/metamodel/data/AbstractRowBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/metamodel/data/AbstractRowBuilder.java b/core/src/main/java/org/apache/metamodel/data/AbstractRowBuilder.java index 144017c..4fdbc59 100644 --- a/core/src/main/java/org/apache/metamodel/data/AbstractRowBuilder.java +++ b/core/src/main/java/org/apache/metamodel/data/AbstractRowBuilder.java @@ -40,6 +40,10 @@ public abstract class AbstractRowBuilder<RB extends RowBuilder<?>> implements Ro this(table.getColumns()); } + public AbstractRowBuilder(Table table, int numberOfValues) { + this(table.getColumns(), numberOfValues); + } + public AbstractRowBuilder(List<Column> columns) { _columns = columns.toArray(new Column[columns.size()]); _explicitNulls = new boolean[_columns.length]; @@ -47,6 +51,14 @@ public abstract class AbstractRowBuilder<RB extends RowBuilder<?>> implements Ro _styles = new Style[_columns.length]; } + public AbstractRowBuilder(List<Column> columns, int numberOfValues) { + _columns = columns.toArray(new Column[columns.size()]); + _explicitNulls = new boolean[numberOfValues]; + _values = new Object[numberOfValues]; + _styles = new Style[numberOfValues]; + setColumns(columns); + } + /** * Gets a boolean array indicating if any of the values have been explicitly * set to null (as opposed to just not set) @@ -71,7 +83,8 @@ public abstract class AbstractRowBuilder<RB extends RowBuilder<?>> implements Ro @Override public final Row toRow() { - return new DefaultRow(new SimpleDataSetHeader(Arrays.stream(_columns).map(SelectItem::new).collect(Collectors.toList())), _values); + return new DefaultRow(new SimpleDataSetHeader(Arrays.stream(_columns).map(SelectItem::new).collect(Collectors + .toList())), _values); } @Override @@ -146,4 +159,22 @@ public abstract class AbstractRowBuilder<RB extends RowBuilder<?>> implements Ro } return false; } + + public void setColumns(List<Column> columns) { + if (columns.size() != _columns.length) { + throw new IllegalArgumentException("The amount of columns don't match"); + } + for (int i = 0; i < _columns.length; i++) { + _columns[i] = columns.get(i); + } + } + + public void setValues(Object[] values) { + if (values.length != _values.length) { + throw new IllegalArgumentException("The amount of values don't match"); + } + for (int i = 0; i < values.length; i++) { + _values[i] = values[i]; + } + } } http://git-wip-us.apache.org/repos/asf/metamodel/blob/6ad602bb/core/src/main/java/org/apache/metamodel/insert/AbstractRowInsertionBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/metamodel/insert/AbstractRowInsertionBuilder.java b/core/src/main/java/org/apache/metamodel/insert/AbstractRowInsertionBuilder.java index 58c0da8..185f942 100644 --- a/core/src/main/java/org/apache/metamodel/insert/AbstractRowInsertionBuilder.java +++ b/core/src/main/java/org/apache/metamodel/insert/AbstractRowInsertionBuilder.java @@ -18,6 +18,8 @@ */ package org.apache.metamodel.insert; +import java.util.List; + import org.apache.metamodel.UpdateCallback; import org.apache.metamodel.data.AbstractRowBuilder; import org.apache.metamodel.data.Row; @@ -25,8 +27,6 @@ import org.apache.metamodel.query.SelectItem; import org.apache.metamodel.schema.Column; import org.apache.metamodel.schema.Table; -import java.util.List; - /** * Abstract implementation of the {@link RowInsertionBuilder} interface, * provided as a convenience to {@link RowInsertable} implementations. Handles @@ -44,6 +44,12 @@ public abstract class AbstractRowInsertionBuilder<U extends UpdateCallback> exte _table = table; } + public AbstractRowInsertionBuilder(U updateCallback, Table table, int numberOfValues) { + super(table, numberOfValues); + _updateCallback = updateCallback; + _table = table; + } + @Override public Table getTable() { return _table; http://git-wip-us.apache.org/repos/asf/metamodel/blob/6ad602bb/hbase/pom.xml ---------------------------------------------------------------------- diff --git a/hbase/pom.xml b/hbase/pom.xml index 5a0cd99..aed4f63 100644 --- a/hbase/pom.xml +++ b/hbase/pom.xml @@ -118,6 +118,113 @@ <groupId>commons-httpclient</groupId> <artifactId>commons-httpclient</artifactId> </exclusion> + <!-- <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </exclusion> --> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core</artifactId> + </exclusion> + <!-- <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </exclusion> --> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-jaxrs</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-xc</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${hbase.version}</version> + <exclusions> + <exclusion> + <groupId>jdk.tools</groupId> + <artifactId>jdk.tools</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <artifactId>log4j</artifactId> + <groupId>log4j</groupId> + </exclusion> + <exclusion> + <artifactId>commons-logging</artifactId> + <groupId>commons-logging</groupId> + </exclusion> + <exclusion> + <artifactId>jetty</artifactId> + <groupId>org.mortbay.jetty</groupId> + </exclusion> + <exclusion> + <artifactId>jetty-util</artifactId> + <groupId>org.mortbay.jetty</groupId> + </exclusion> + <exclusion> + <groupId>com.github.stephenc.findbugs</groupId> + <artifactId>findbugs-annotations</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>google-collections</groupId> + <artifactId>google-collections</artifactId> + </exclusion> + <exclusion> + <groupId>net.sourceforge.collections</groupId> + <artifactId>collections-generic</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-api-2.1</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + </exclusion> + <exclusion> + <groupId>commons-httpclient</groupId> + <artifactId>commons-httpclient</artifactId> + </exclusion> <exclusion> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> @@ -138,6 +245,10 @@ <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-xc</artifactId> </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api-2.5</artifactId> + </exclusion> </exclusions> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/metamodel/blob/6ad602bb/hbase/src/main/java/org/apache/metamodel/hbase/HBaseColumn.java ---------------------------------------------------------------------- diff --git a/hbase/src/main/java/org/apache/metamodel/hbase/HBaseColumn.java b/hbase/src/main/java/org/apache/metamodel/hbase/HBaseColumn.java new file mode 100644 index 0000000..abe37d1 --- /dev/null +++ b/hbase/src/main/java/org/apache/metamodel/hbase/HBaseColumn.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.hbase; + +public final class HBaseColumn { + private String columnFamily; + private String qualifier; + + public HBaseColumn() { + columnFamily = ""; + qualifier = ""; + } + + public HBaseColumn(String columnFamily, String qualifier) { + this.columnFamily = columnFamily; + this.qualifier = qualifier; + } + + public String getColumnFamily() { + return columnFamily; + } + + public void setColumnFamily(String columnFamily) { + this.columnFamily = columnFamily; + } + + public String getQualifier() { + return qualifier; + } + + public void setQualifier(String qualifier) { + this.qualifier = qualifier; + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/6ad602bb/hbase/src/main/java/org/apache/metamodel/hbase/HBaseCreateTableBuilder.java ---------------------------------------------------------------------- diff --git a/hbase/src/main/java/org/apache/metamodel/hbase/HBaseCreateTableBuilder.java b/hbase/src/main/java/org/apache/metamodel/hbase/HBaseCreateTableBuilder.java new file mode 100644 index 0000000..e1f1257 --- /dev/null +++ b/hbase/src/main/java/org/apache/metamodel/hbase/HBaseCreateTableBuilder.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.hbase; + +import java.io.IOException; +import java.util.LinkedHashSet; +import java.util.Set; + +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.create.AbstractTableCreationBuilder; +import org.apache.metamodel.schema.MutableSchema; +import org.apache.metamodel.schema.MutableTable; +import org.apache.metamodel.schema.Schema; +import org.apache.metamodel.schema.Table; +import org.apache.metamodel.util.SimpleTableDef; + +public class HBaseCreateTableBuilder extends AbstractTableCreationBuilder<HBaseUpdateCallback> { + + private Set<String> columnFamilies; + + public HBaseCreateTableBuilder(HBaseUpdateCallback updateCallback, Schema schema, String name, + HBaseColumn[] outputColumns) { + super(updateCallback, schema, name); + if (!(schema instanceof MutableSchema)) { + throw new IllegalArgumentException("Not a valid schema: " + schema); + } + columnFamilies = new LinkedHashSet<String>(); + for (int i = 0; i < outputColumns.length; i++) { + columnFamilies.add(outputColumns[i].getColumnFamily()); + } + } + + @Override + public Table execute() throws MetaModelException { + final MutableTable table = getTable(); + final SimpleTableDef emptyTableDef = new SimpleTableDef(table.getName(), columnFamilies.toArray( + new String[columnFamilies.size()])); + + final HBaseUpdateCallback updateCallback = (HBaseUpdateCallback) getUpdateCallback(); + + try { + final HBaseWriter HbaseWriter = new HBaseWriter(HBaseDataContext.createConfig(updateCallback + .getConfiguration())); + HbaseWriter.createTable(table.getName(), columnFamilies); + } catch (IOException e) { + e.printStackTrace(); + } + + final MutableSchema schema = (MutableSchema) table.getSchema(); + schema.addTable(new HBaseTable(updateCallback.getDataContext(), emptyTableDef, schema, + HBaseConfiguration.DEFAULT_ROW_KEY_TYPE)); + return schema.getTableByName(table.getName()); + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/6ad602bb/hbase/src/main/java/org/apache/metamodel/hbase/HBaseDataContext.java ---------------------------------------------------------------------- diff --git a/hbase/src/main/java/org/apache/metamodel/hbase/HBaseDataContext.java b/hbase/src/main/java/org/apache/metamodel/hbase/HBaseDataContext.java index 46866b7..6b48723 100644 --- a/hbase/src/main/java/org/apache/metamodel/hbase/HBaseDataContext.java +++ b/hbase/src/main/java/org/apache/metamodel/hbase/HBaseDataContext.java @@ -19,7 +19,6 @@ package org.apache.metamodel.hbase; import java.io.IOException; -import java.lang.reflect.Method; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -73,6 +72,9 @@ public class HBaseDataContext extends QueryPostprocessDataContext { Configuration config = createConfig(configuration); _configuration = configuration; _connection = createConnection(config); + + // HBaseUpdateCallback updateCallback = new HBaseUpdateCallback(this); + // updateCallback.writeRow(); } /** @@ -85,6 +87,9 @@ public class HBaseDataContext extends QueryPostprocessDataContext { super(false); _configuration = configuration; _connection = connection; + + // HBaseUpdateCallback updateCallback = new HBaseUpdateCallback(this); + // updateCallback.writeRow(); } private Connection createConnection(Configuration config) { @@ -95,7 +100,7 @@ public class HBaseDataContext extends QueryPostprocessDataContext { } } - private Configuration createConfig(HBaseConfiguration configuration) { + static protected Configuration createConfig(HBaseConfiguration configuration) { Configuration config = org.apache.hadoop.hbase.HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", configuration.getZookeeperHostname()); config.set("hbase.zookeeper.property.clientPort", Integer.toString(configuration.getZookeeperPort())); @@ -123,28 +128,28 @@ public class HBaseDataContext extends QueryPostprocessDataContext { } @Override - protected Schema getMainSchema() throws MetaModelException { + public Schema getMainSchema() throws MetaModelException { final MutableSchema schema = new MutableSchema(_configuration.getSchemaName()); - try { - SimpleTableDef[] tableDefinitions = _configuration.getTableDefinitions(); - if (tableDefinitions == null) { + SimpleTableDef[] tableDefinitions = _configuration.getTableDefinitions(); + if (tableDefinitions == null) { + try { final HTableDescriptor[] tables = getAdmin().listTables(); tableDefinitions = new SimpleTableDef[tables.length]; for (int i = 0; i < tables.length; i++) { SimpleTableDef emptyTableDef = new SimpleTableDef(tables[i].getNameAsString(), new String[0]); tableDefinitions[i] = emptyTableDef; } + } catch (IllegalArgumentException | IOException e) { + throw new MetaModelException(e); } + } - for (SimpleTableDef tableDef : tableDefinitions) { - schema.addTable(new HBaseTable(this, tableDef, schema, _configuration.getDefaultRowKeyType())); - } - - return schema; - } catch (Exception e) { - throw new MetaModelException(e); + for (SimpleTableDef tableDef : tableDefinitions) { + schema.addTable(new HBaseTable(this, tableDef, schema, _configuration.getDefaultRowKeyType())); } + + return schema; } /** @@ -242,17 +247,6 @@ public class HBaseDataContext extends QueryPostprocessDataContext { } private void setMaxRows(Scan scan, int maxRows) { - try { - // in old versions of the HBase API, the 'setMaxResultSize' method - // is not available - Method method = scan.getClass().getMethod("setMaxResultSize", long.class); - method.invoke(scan, (long) maxRows); - logger.debug("Succesfully set maxRows using Scan.setMaxResultSize({})", maxRows); - } catch (Exception e) { - logger.debug( - "HBase API does not have Scan.setMaxResultSize(long) method, setting maxRows using PageFilter.", e); - scan.setFilter(new PageFilter(maxRows)); - } + scan.setFilter(new PageFilter(maxRows)); } - } http://git-wip-us.apache.org/repos/asf/metamodel/blob/6ad602bb/hbase/src/main/java/org/apache/metamodel/hbase/HBaseFamilyMap.java ---------------------------------------------------------------------- diff --git a/hbase/src/main/java/org/apache/metamodel/hbase/HBaseFamilyMap.java b/hbase/src/main/java/org/apache/metamodel/hbase/HBaseFamilyMap.java index 38a6848..7483c63 100644 --- a/hbase/src/main/java/org/apache/metamodel/hbase/HBaseFamilyMap.java +++ b/hbase/src/main/java/org/apache/metamodel/hbase/HBaseFamilyMap.java @@ -18,7 +18,6 @@ */ package org.apache.metamodel.hbase; -import java.util.Arrays; import java.util.Collection; import java.util.Map; import java.util.NavigableMap; @@ -106,9 +105,9 @@ public class HBaseFamilyMap implements Map<Object, Object> { if (sb.length() > 1) { sb.append(','); } - sb.append(Arrays.toString(entry.getKey())); + sb.append(new String(entry.getKey())); sb.append('='); - sb.append(Arrays.toString(entry.getValue())); + sb.append(new String(entry.getValue())); } sb.append('}'); return sb.toString(); http://git-wip-us.apache.org/repos/asf/metamodel/blob/6ad602bb/hbase/src/main/java/org/apache/metamodel/hbase/HBaseRow.java ---------------------------------------------------------------------- diff --git a/hbase/src/main/java/org/apache/metamodel/hbase/HBaseRow.java b/hbase/src/main/java/org/apache/metamodel/hbase/HBaseRow.java index b091ae1..2820602 100644 --- a/hbase/src/main/java/org/apache/metamodel/hbase/HBaseRow.java +++ b/hbase/src/main/java/org/apache/metamodel/hbase/HBaseRow.java @@ -59,9 +59,13 @@ final class HBaseRow extends AbstractRow implements Row { } return rowKey; } - final int colonIndex = name.indexOf(':'); if (colonIndex != -1) { + /* + * I think this is DEATH code! The first line of this method: implementation + * _header.getSelectItem(index).getColumn() will always return only a columnfamily with our current + * implementations. + */ byte[] family = name.substring(0, colonIndex).getBytes(); byte[] qualifier = name.substring(colonIndex + 1).getBytes(); byte[] value = _result.getValue(family, qualifier); http://git-wip-us.apache.org/repos/asf/metamodel/blob/6ad602bb/hbase/src/main/java/org/apache/metamodel/hbase/HBaseRowInsertionBuilder.java ---------------------------------------------------------------------- diff --git a/hbase/src/main/java/org/apache/metamodel/hbase/HBaseRowInsertionBuilder.java b/hbase/src/main/java/org/apache/metamodel/hbase/HBaseRowInsertionBuilder.java new file mode 100644 index 0000000..358d743 --- /dev/null +++ b/hbase/src/main/java/org/apache/metamodel/hbase/HBaseRowInsertionBuilder.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.hbase; + +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.insert.AbstractRowInsertionBuilder; +import org.apache.metamodel.schema.Table; + +public class HBaseRowInsertionBuilder extends AbstractRowInsertionBuilder<HBaseUpdateCallback> { + + private final HBaseColumn[] _outputColumns; + + public HBaseRowInsertionBuilder(HBaseUpdateCallback updateCallback, Table table, HBaseColumn[] outputColumns) { + super(updateCallback, table, outputColumns.length); + _outputColumns = outputColumns; + } + + @Override + public void execute() throws MetaModelException { + checkForMatchingColumnFamilies(getTable(), _outputColumns); + getUpdateCallback().writeRow((HBaseTable) getTable(), _outputColumns, getValues()); + } + + private void checkForMatchingColumnFamilies(Table table, HBaseColumn[] outputColumns) { + for (int i = 0; i < outputColumns.length; i++) { + if (!outputColumns[i].getColumnFamily().equals(HBaseDataContext.FIELD_ID)) { + boolean matchingColumnFound = false; + int indexOfTablesColumn = 0; + + while (!matchingColumnFound && indexOfTablesColumn < table.getColumnCount()) { + if (outputColumns[i].getColumnFamily().equals(table.getColumn(indexOfTablesColumn).getName())) { + matchingColumnFound = true; + } else { + indexOfTablesColumn++; + } + } + + if (!matchingColumnFound) { + throw new IllegalArgumentException(String.format( + "OutputColumnFamily: %s doesn't exist in the schema of the table", outputColumns[i] + .getColumnFamily())); + } + } + } + } + + public HBaseColumn[] getOutputColumns() { + return _outputColumns; + } + + public void setOutputColumns(HBaseColumn[] outputColumns) { + if (outputColumns.length != _outputColumns.length) { + throw new IllegalArgumentException("The amount of outputColumns don't match"); + } + for (int i = 0; i < outputColumns.length; i++) { + _outputColumns[i] = outputColumns[i]; + } + } + +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/6ad602bb/hbase/src/main/java/org/apache/metamodel/hbase/HBaseTableDropBuilder.java ---------------------------------------------------------------------- diff --git a/hbase/src/main/java/org/apache/metamodel/hbase/HBaseTableDropBuilder.java b/hbase/src/main/java/org/apache/metamodel/hbase/HBaseTableDropBuilder.java new file mode 100644 index 0000000..5ab38d9 --- /dev/null +++ b/hbase/src/main/java/org/apache/metamodel/hbase/HBaseTableDropBuilder.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.hbase; + +import org.apache.metamodel.MetaModelException; +import org.apache.metamodel.drop.AbstractTableDropBuilder; +import org.apache.metamodel.schema.Table; + +public class HBaseTableDropBuilder extends AbstractTableDropBuilder { + private final HBaseUpdateCallback _updateCallback; + + public HBaseTableDropBuilder(Table table, HBaseUpdateCallback updateCallback) { + super(table); + _updateCallback = updateCallback; + } + + @Override + public void execute() throws MetaModelException { + _updateCallback.dropTableExecute(getTable()); + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/6ad602bb/hbase/src/main/java/org/apache/metamodel/hbase/HBaseUpdateCallback.java ---------------------------------------------------------------------- diff --git a/hbase/src/main/java/org/apache/metamodel/hbase/HBaseUpdateCallback.java b/hbase/src/main/java/org/apache/metamodel/hbase/HBaseUpdateCallback.java new file mode 100644 index 0000000..271aa87 --- /dev/null +++ b/hbase/src/main/java/org/apache/metamodel/hbase/HBaseUpdateCallback.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.hbase; + +import java.io.IOException; + +import org.apache.metamodel.AbstractUpdateCallback; +import org.apache.metamodel.UpdateCallback; +import org.apache.metamodel.create.TableCreationBuilder; +import org.apache.metamodel.delete.RowDeletionBuilder; +import org.apache.metamodel.drop.TableDropBuilder; +import org.apache.metamodel.insert.RowInsertionBuilder; +import org.apache.metamodel.schema.MutableSchema; +import org.apache.metamodel.schema.Schema; +import org.apache.metamodel.schema.Table; + +public class HBaseUpdateCallback extends AbstractUpdateCallback implements UpdateCallback { + + private final HBaseConfiguration _configuration; + + private final HBaseDataContext _dataContext; + + public HBaseUpdateCallback(HBaseDataContext dataContext) { + super(dataContext); + _configuration = dataContext.getConfiguration(); + _dataContext = dataContext; + } + + @Override + public TableCreationBuilder createTable(Schema schema, String name) throws IllegalArgumentException, + IllegalStateException { + throw new UnsupportedOperationException( + "Use createTable(Schema schema, String name, HBaseColumn[] outputColumns)"); + } + + public HBaseCreateTableBuilder createTable(Schema schema, String name, HBaseColumn[] outputColumns) + throws IllegalArgumentException, IllegalStateException { + return new HBaseCreateTableBuilder(this, schema, name, outputColumns); + } + + @Override + public boolean isDropTableSupported() { + return true; + } + + @Override + public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException, + UnsupportedOperationException { + return new HBaseTableDropBuilder(table, this); + } + + public void dropTableExecute(Table table) { + try { + final HBaseWriter HbaseWriter = new HBaseWriter(HBaseDataContext.createConfig(_configuration)); + HbaseWriter.dropTable(table.getName()); + MutableSchema schema = (MutableSchema) table.getSchema(); + schema.removeTable(table); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public RowInsertionBuilder insertInto(Table table) throws IllegalArgumentException, IllegalStateException, + UnsupportedOperationException { + throw new UnsupportedOperationException("Use insertInto(String tableName, HBaseColumn[] outputColumns)"); + } + + public HBaseRowInsertionBuilder insertInto(String tableName, HBaseColumn[] outputColumns) + throws IllegalArgumentException, IllegalStateException, UnsupportedOperationException { + Table table = getTable(tableName); + return insertInto(table, outputColumns); + } + + public HBaseRowInsertionBuilder insertInto(Table table, HBaseColumn[] outputColumns) + throws IllegalArgumentException, IllegalStateException, UnsupportedOperationException { + validateTable(table); + return new HBaseRowInsertionBuilder(this, table, outputColumns); + } + + private void validateTable(Table table) { + if (!(table instanceof HBaseTable)) { + throw new IllegalArgumentException("Not a valid HBase table: " + table); + } + } + + protected synchronized void writeRow(HBaseTable hBaseTable, HBaseColumn[] outputColumns, Object[] values) { + try { + final HBaseWriter HbaseWriter = new HBaseWriter(HBaseDataContext.createConfig(_configuration)); + HbaseWriter.writeRow(hBaseTable, outputColumns, values); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public boolean isDeleteSupported() { + return false; + } + + @Override + public RowDeletionBuilder deleteFrom(Table table) throws IllegalArgumentException, IllegalStateException, + UnsupportedOperationException { + throw new UnsupportedOperationException(); + } + + public boolean tableAlreadyExists(String tableName) { + return _dataContext.getMainSchema().getTableByName(tableName) == null ? false : true; + } + + public HBaseConfiguration getConfiguration() { + return _configuration; + } + + public HBaseDataContext getDataContext() { + return _dataContext; + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/6ad602bb/hbase/src/main/java/org/apache/metamodel/hbase/HBaseWriter.java ---------------------------------------------------------------------- diff --git a/hbase/src/main/java/org/apache/metamodel/hbase/HBaseWriter.java b/hbase/src/main/java/org/apache/metamodel/hbase/HBaseWriter.java new file mode 100644 index 0000000..f8897da --- /dev/null +++ b/hbase/src/main/java/org/apache/metamodel/hbase/HBaseWriter.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.hbase; + +import java.io.IOException; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.metamodel.MetaModelException; + +public final class HBaseWriter extends Configured { + + static final byte[] INFO_COLUMNFAMILY = Bytes.toBytes("info"); + static final byte[] NAME_QUALIFIER = Bytes.toBytes("name"); + static final byte[] LOCATION_QUALIFIER = Bytes.toBytes("location"); + static final byte[] DESCRIPTION_QUALIFIER = Bytes.toBytes("description"); + + private final Connection _connection; + + public HBaseWriter(Configuration configuration) throws IOException { + _connection = ConnectionFactory.createConnection(configuration); + } + + public void writeRow(HBaseTable hBaseTable, HBaseColumn[] outputColumns, Object[] values) throws IOException { + try { + Table table = _connection.getTable(TableName.valueOf(hBaseTable.getName())); + try { + int indexOfIdColumn = 0; + boolean idColumnFound = false; + while (!idColumnFound && indexOfIdColumn < outputColumns.length) { + if (outputColumns[indexOfIdColumn].getColumnFamily().equals(HBaseDataContext.FIELD_ID)) { + idColumnFound = true; + } else { + indexOfIdColumn++; + } + } + if (!idColumnFound) { + throw new MetaModelException("The ID Column family was not found"); + } + + Put put = new Put(Bytes.toBytes(values[indexOfIdColumn].toString())); + + for (int i = 0; i < outputColumns.length; i++) { + if (!outputColumns[i].getColumnFamily().equals(HBaseDataContext.FIELD_ID)) { + put.addColumn(Bytes.toBytes(outputColumns[i].getColumnFamily()), Bytes.toBytes(outputColumns[i] + .getQualifier()), Bytes.toBytes(values[i].toString())); + } + } + table.put(put); + } finally { + table.close(); + } + } finally { + _connection.close(); + } + } + + public void deleteRow(HBaseTable hBaseTable, Object key) throws IOException { + try { + Table table = _connection.getTable(TableName.valueOf(hBaseTable.getName())); + try { + table.delete(new Delete(Bytes.toBytes(key.toString()))); + } finally { + table.close(); + } + } finally { + _connection.close(); + } + } + + public void createTable(String tableName, Set<String> columnFamilies) throws IOException { + try { + // Create table + Admin admin = _connection.getAdmin(); + try { + TableName hBasetableName = TableName.valueOf(tableName); + HTableDescriptor tableDescriptor = new HTableDescriptor(hBasetableName); + for (String columnFamilie : columnFamilies) { + if (!columnFamilie.equals(HBaseDataContext.FIELD_ID)) { + tableDescriptor.addFamily(new HColumnDescriptor(columnFamilie)); + } + } + admin.createTable(tableDescriptor); + HTableDescriptor[] tables = admin.listTables(); + if (tables.length != 1 && Bytes.equals(hBasetableName.getName(), tables[0].getTableName().getName())) { + throw new IOException("Failed create of table"); + } + } finally { + admin.close(); + } + } finally { + _connection.close(); + } + + } + + public void dropTable(String tableName) throws IOException { + try { + Admin admin = _connection.getAdmin(); + try { + TableName hBasetableName = TableName.valueOf(tableName); + admin.disableTable(hBasetableName); + admin.deleteTable(hBasetableName); + } finally { + admin.close(); + } + } finally { + _connection.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/metamodel/blob/6ad602bb/hbase/src/main/java/org/apache/metamodel/hbase/qualifiers/FindQualifiersDriver.java ---------------------------------------------------------------------- diff --git a/hbase/src/main/java/org/apache/metamodel/hbase/qualifiers/FindQualifiersDriver.java b/hbase/src/main/java/org/apache/metamodel/hbase/qualifiers/FindQualifiersDriver.java new file mode 100644 index 0000000..3efa311 --- /dev/null +++ b/hbase/src/main/java/org/apache/metamodel/hbase/qualifiers/FindQualifiersDriver.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.metamodel.hbase.qualifiers; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.KeyOnlyFilter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +public class FindQualifiersDriver extends Configured implements Tool { + + static class OnlyColumnNameMapper extends TableMapper<Text, Text> { + @Override + protected void map(ImmutableBytesWritable key, Result value, final Context context) throws IOException, + InterruptedException { + CellScanner cellScanner = value.cellScanner(); + while (cellScanner.advance()) { + + Cell cell = cellScanner.current(); + byte[] q = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + + context.write(new Text(q), new Text()); + } + } + } + + static class OnlyColumnNameReducer extends Reducer<Text, Text, Text, Text> { + + @Override + protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, + InterruptedException { + context.write(new Text(key), new Text()); + } + } + + @Override + public int run(String[] args) throws Exception { + Path outputPath = new Path("output/"); + byte[] tableName = new String("ietsanders2").getBytes(); + byte[] columnFamilyName = new String("data").getBytes(); + + Configuration configuration = createConfig(); + FileSystem fileSystem = FileSystem.get(configuration); + fileSystem.delete(outputPath, true); + + Job job = Job.getInstance(configuration, "Distinct_columns"); + job.setJarByClass(this.getClass()); + + Scan scan = new Scan(); + scan.setBatch(500); + scan.addFamily(columnFamilyName); + scan.setFilter(new KeyOnlyFilter()); // scan only key part of KeyValue (raw, column family, column) + scan.setCacheBlocks(false); // don't set to true for MR jobs + + TextOutputFormat.setOutputPath(job, outputPath); + + TableMapReduceUtil.initTableMapperJob(tableName, scan, OnlyColumnNameMapper.class, // mapper + Text.class, // mapper output key + Text.class, // mapper output value + job); + + job.setNumReduceTasks(1); + job.setReducerClass(OnlyColumnNameReducer.class); + + return job.waitForCompletion(true) ? 0 : 1; + } + + protected Configuration createConfig() { + Configuration config = org.apache.hadoop.hbase.HBaseConfiguration.create(); + config.set("hbase.zookeeper.quorum", "bigdatavm"); + config.set("hbase.zookeeper.property.clientPort", Integer.toString(2181)); + config.set("hbase.client.retries.number", Integer.toString(1)); + config.set("zookeeper.session.timeout", Integer.toString(5000)); + config.set("zookeeper.recovery.retry", Integer.toString(1)); + return config; + } + + public static void main(String[] args) throws Exception { + int exitCode = ToolRunner.run(new FindQualifiersDriver(), args); + System.exit(exitCode); + } +}
