GIRAPH-1012: Remove giraph-hive Summary: We are not using hive-io-experimental anymore and we'll be deprecating that project. Since we are not aware of anyone else using it, we are thinking of removing giraph-hive completely from the repository. Please comment if you have any objections.
Test Plan: compile with different profiles Reviewers: ikabiljo, sergey.edunov Differential Revision: https://reviews.facebook.net/D40053 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/ad27a291 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/ad27a291 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/ad27a291 Branch: refs/heads/trunk Commit: ad27a2914d57a3ee174b34bc3a2a4de9f3aca215 Parents: 819d6d3 Author: Maja Kabiljo <[email protected]> Authored: Fri Jun 12 11:48:18 2015 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Fri Jun 12 13:02:13 2015 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + giraph-dist/pom.xml | 4 - giraph-hive/pom.xml | 157 ---- giraph-hive/src/main/assembly/compile.xml | 39 - .../apache/giraph/hive/HiveGiraphRunner.java | 831 ----------------- .../giraph/hive/column/HiveReadableColumn.java | 192 ---- .../giraph/hive/column/HiveWritableColumn.java | 163 ---- .../apache/giraph/hive/column/package-info.java | 21 - .../DefaultConfigurableAndTableSchemaAware.java | 50 - .../giraph/hive/common/GiraphHiveConstants.java | 87 -- .../giraph/hive/common/HiveInputOptions.java | 183 ---- .../apache/giraph/hive/common/HiveParsing.java | 166 ---- .../apache/giraph/hive/common/HiveUtils.java | 394 -------- .../giraph/hive/common/LanguageAndType.java | 78 -- .../apache/giraph/hive/common/package-info.java | 21 - .../giraph/hive/input/HiveInputChecker.java | 37 - .../giraph/hive/input/RecordReaderWrapper.java | 59 -- .../hive/input/edge/AbstractHiveToEdge.java | 38 - .../hive/input/edge/HiveEdgeInputFormat.java | 112 --- .../giraph/hive/input/edge/HiveEdgeReader.java | 104 --- .../giraph/hive/input/edge/HiveToEdge.java | 49 - .../hive/input/edge/SimpleHiveToEdge.java | 100 -- .../input/edge/examples/HiveIntDoubleEdge.java | 58 -- .../input/edge/examples/HiveIntNullEdge.java | 60 -- .../hive/input/edge/examples/package-info.java | 21 - .../giraph/hive/input/edge/package-info.java | 21 - .../input/mapping/AbstractHiveToMapping.java | 39 - .../input/mapping/HiveMappingInputFormat.java | 116 --- .../hive/input/mapping/HiveMappingReader.java | 100 -- .../hive/input/mapping/HiveToMapping.java | 44 - .../hive/input/mapping/SimpleHiveToMapping.java | 105 --- .../mapping/examples/LongByteHiveToMapping.java | 56 -- .../examples/LongInt2ByteHiveToMapping.java | 81 -- .../input/mapping/examples/package-info.java | 22 - .../giraph/hive/input/mapping/package-info.java | 22 - .../apache/giraph/hive/input/package-info.java | 21 - .../hive/input/vertex/AbstractHiveToVertex.java | 40 - .../giraph/hive/input/vertex/HiveToVertex.java | 50 - .../input/vertex/HiveVertexInputFormat.java | 113 --- .../hive/input/vertex/HiveVertexReader.java | 109 --- .../hive/input/vertex/SimpleHiveToVertex.java | 120 --- .../input/vertex/SimpleNoEdgesHiveToVertex.java | 41 - .../examples/HiveIntDoubleDoubleVertex.java | 59 -- .../vertex/examples/HiveIntIntNullVertex.java | 58 -- .../vertex/examples/HiveIntNullNullVertex.java | 59 -- .../input/vertex/examples/package-info.java | 21 - .../giraph/hive/input/vertex/package-info.java | 21 - .../giraph/hive/jython/HiveJythonRunner.java | 126 --- .../giraph/hive/jython/HiveJythonUtils.java | 910 ------------------- .../giraph/hive/jython/JythonColumnReader.java | 82 -- .../giraph/hive/jython/JythonColumnWriter.java | 83 -- .../apache/giraph/hive/jython/JythonHiveIO.java | 24 - .../giraph/hive/jython/JythonHiveReader.java | 36 - .../giraph/hive/jython/JythonHiveToEdge.java | 107 --- .../giraph/hive/jython/JythonHiveToVertex.java | 99 -- .../giraph/hive/jython/JythonHiveWriter.java | 36 - .../hive/jython/JythonReadableColumn.java | 179 ---- .../giraph/hive/jython/JythonVertexToHive.java | 81 -- .../apache/giraph/hive/jython/package-info.java | 22 - .../hive/output/AbstractVertexToHive.java | 37 - .../giraph/hive/output/HiveRecordSaver.java | 37 - .../hive/output/HiveVertexOutputFormat.java | 116 --- .../giraph/hive/output/HiveVertexWriter.java | 120 --- .../giraph/hive/output/SimpleVertexToHive.java | 58 -- .../apache/giraph/hive/output/VertexToHive.java | 77 -- .../output/examples/HiveOutputIntIntVertex.java | 48 - .../hive/output/examples/package-info.java | 22 - .../apache/giraph/hive/output/package-info.java | 22 - .../org/apache/giraph/hive/package-info.java | 21 - .../hive/primitives/PrimitiveValueReader.java | 88 -- .../hive/primitives/PrimitiveValueWriter.java | 88 -- .../giraph/hive/primitives/package-info.java | 21 - .../giraph/hive/values/HiveValueReader.java | 59 -- .../giraph/hive/values/HiveValueWriter.java | 59 -- .../apache/giraph/hive/values/package-info.java | 22 - .../apache/giraph/hive/GiraphHiveTestBase.java | 30 - .../java/org/apache/giraph/hive/Helpers.java | 84 -- .../computations/ComputationCountEdges.java | 36 - .../hive/computations/ComputationSumEdges.java | 42 - .../giraph/hive/computations/package-info.java | 22 - .../giraph/hive/input/CheckInputTest.java | 133 --- .../giraph/hive/input/HiveEdgeInputTest.java | 144 --- .../giraph/hive/input/HiveVertexInputTest.java | 144 --- .../apache/giraph/hive/input/package-info.java | 22 - .../hive/jython/TestHiveJythonComplexTypes.java | 157 ---- .../hive/jython/TestHiveJythonPrimitives.java | 128 --- .../giraph/hive/output/CheckOutputTest.java | 108 --- .../giraph/hive/output/HiveOutputTest.java | 150 --- .../apache/giraph/hive/output/package-info.java | 22 - .../giraph/jython/count-edges-launcher.py | 43 - .../jython/fake-label-propagation-launcher.py | 52 -- .../jython/fake-label-propagation-worker.py | 91 -- pom.xml | 12 - src/site/xdoc/javadoc_modules.xml | 1 - 94 files changed, 2 insertions(+), 8443 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 8836f3c..2077a2d 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.2.0 - unreleased + GIRAPH-1012: Remove giraph-hive (majakabiljo) + GIRAPH-1009: Spammy 'lost reservation' messages from ZooKeeper in workers' log at the end of the computation. (heslami via aching) http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-dist/pom.xml ---------------------------------------------------------------------- diff --git a/giraph-dist/pom.xml b/giraph-dist/pom.xml index 70891ce..310f97f 100644 --- a/giraph-dist/pom.xml +++ b/giraph-dist/pom.xml @@ -101,10 +101,6 @@ </dependency> <dependency> <groupId>org.apache.giraph</groupId> - <artifactId>giraph-hive</artifactId> - </dependency> - <dependency> - <groupId>org.apache.giraph</groupId> <artifactId>giraph-examples</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/pom.xml ---------------------------------------------------------------------- diff --git a/giraph-hive/pom.xml b/giraph-hive/pom.xml deleted file mode 100644 index 1772f63..0000000 --- a/giraph-hive/pom.xml +++ /dev/null @@ -1,157 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.giraph</groupId> - <artifactId>giraph-parent</artifactId> - <version>1.2.0-SNAPSHOT</version> - </parent> - <artifactId>giraph-hive</artifactId> - <packaging>jar</packaging> - - <name>Apache Giraph Hive I/O</name> - <url>http://giraph.apache.org/giraph-hive/</url> - <description>Giraph Hive input/output classes</description> - - <properties> - <top.dir>${project.basedir}/..</top.dir> - </properties> - - <profiles> - <profile> - <id>hadoop_2</id> - <properties> - <surefire.skip>true</surefire.skip> - </properties> - </profile> - </profiles> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-assembly-plugin</artifactId> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-site-plugin</artifactId> - <configuration> - <siteDirectory>${project.basedir}/src/site</siteDirectory> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <version>2.6</version> - <configuration> - <skip>${surefire.skip}</skip> - <systemProperties> - <property> - <name>prop.jarLocation</name> - <value>${top.dir}/giraph-core/target/giraph-${project.version}-${forHadoop}-jar-with-dependencies.jar</value> - </property> - </systemProperties> - </configuration> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>findbugs-maven-plugin</artifactId> - </plugin> - </plugins> - </build> - - <dependencies> - <!-- compile dependencies. sorted lexicographically. --> - <dependency> - <groupId>commons-cli</groupId> - <artifactId>commons-cli</artifactId> - </dependency> - <dependency> - <groupId>com.facebook.hiveio</groupId> - <artifactId>hive-io-exp-core</artifactId> - </dependency> - <dependency> - <groupId>org.apache.giraph</groupId> - <artifactId>giraph-core</artifactId> - </dependency> - <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-exec</artifactId> - </dependency> - <dependency> - <groupId>org.python</groupId> - <artifactId>jython</artifactId> - </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </dependency> - - <!-- runtime dependency --> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <scope>runtime</scope> - </dependency> - - <!-- test dependencies. sorted lexicographically. --> - <dependency> - <groupId>org.apache.giraph</groupId> - <artifactId>giraph-core</artifactId> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>com.facebook.hiveio</groupId> - <artifactId>hive-io-exp-core</artifactId> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>com.facebook.hiveio</groupId> - <artifactId>hive-io-exp-testing</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/assembly/compile.xml ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/assembly/compile.xml b/giraph-hive/src/main/assembly/compile.xml deleted file mode 100644 index fcaffa6..0000000 --- a/giraph-hive/src/main/assembly/compile.xml +++ /dev/null @@ -1,39 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> - <id>jar-with-dependencies</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - - <dependencySets> - <dependencySet> - <useProjectArtifact>true</useProjectArtifact> - <outputDirectory>/</outputDirectory> - <unpackOptions> - <excludes> - <exclude>META-INF/LICENSE</exclude> - </excludes> - </unpackOptions> - <unpack>true</unpack> - <scope>runtime</scope> - </dependencySet> - </dependencySets> -</assembly> http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java deleted file mode 100644 index 8849f55..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java +++ /dev/null @@ -1,831 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.giraph.conf.GiraphConfiguration; -import org.apache.giraph.conf.GiraphConstants; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.graph.Computation; -import org.apache.giraph.hive.common.HiveUtils; -import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat; -import org.apache.giraph.hive.input.edge.HiveToEdge; -import org.apache.giraph.hive.input.mapping.HiveMappingInputFormat; -import org.apache.giraph.hive.input.mapping.HiveToMapping; -import org.apache.giraph.hive.input.vertex.HiveToVertex; -import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat; -import org.apache.giraph.hive.output.HiveVertexOutputFormat; -import org.apache.giraph.hive.output.VertexToHive; -import org.apache.giraph.io.formats.multi.EdgeInputFormatDescription; -import org.apache.giraph.io.formats.multi.InputFormatDescription; -import org.apache.giraph.io.formats.multi.MultiEdgeInputFormat; -import org.apache.giraph.io.formats.multi.MultiVertexInputFormat; -import org.apache.giraph.io.formats.multi.VertexInputFormatDescription; -import org.apache.giraph.job.GiraphJob; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.apache.log4j.Logger; - -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - -import java.util.Arrays; -import java.util.List; - -import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT; -import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_MAPPING_INPUT; -import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT; -import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_DATABASE; -import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PARTITION; -import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PROFILE_ID; -import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_TABLE; -import static org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_TO_HIVE_CLASS; - -/** - * Hive Giraph Runner - */ -public class HiveGiraphRunner implements Tool { - /** logger */ - private static final Logger LOG = Logger.getLogger(HiveGiraphRunner.class); - /** Prefix for log statements */ - private static final String LOG_PREFIX = "\t"; - - /** workers */ - protected int workers; - /** is verbose */ - protected boolean isVerbose; - - /** vertex class. */ - private Class<? extends Computation> computationClass; - - /** Descriptions of vertex input formats */ - private List<VertexInputFormatDescription> vertexInputDescriptions = - Lists.newArrayList(); - - /** Descriptions of edge input formats */ - private List<EdgeInputFormatDescription> edgeInputDescriptions = - Lists.newArrayList(); - - /** Hive Mapping reader */ - private Class<? extends HiveToMapping> hiveToMappingClass; - /** Hive Vertex writer */ - private Class<? extends VertexToHive> vertexToHiveClass; - /** Skip output? (Useful for testing without writing) */ - private boolean skipOutput = false; - - /** Configuration */ - private Configuration conf; - - /** Create a new runner */ - public HiveGiraphRunner() { - conf = new HiveConf(getClass()); - } - - public Class<? extends Computation> getComputationClass() { - return computationClass; - } - - public void setComputationClass( - Class<? extends Computation> computationClass) { - this.computationClass = computationClass; - } - - public List<VertexInputFormatDescription> getVertexInputDescriptions() { - return vertexInputDescriptions; - } - - /** - * Whether to use vertex input. - * - * @return true if vertex input enabled (at least one HiveToVertex is set). - */ - public boolean hasVertexInput() { - return !vertexInputDescriptions.isEmpty(); - } - - /** - * Add vertex input - * - * @param hiveToVertexClass HiveToVertex class to use - * @param tableName Table name - * @param partitionFilter Partition filter, or null if no filter used - * @param additionalOptions Additional options, in the form "option=value" - */ - public void addVertexInput(Class<? extends HiveToVertex> hiveToVertexClass, - String tableName, String partitionFilter, String ... additionalOptions) { - VertexInputFormatDescription description = - new VertexInputFormatDescription(HiveVertexInputFormat.class); - description.addParameter( - HIVE_VERTEX_INPUT.getClassOpt().getKey(), hiveToVertexClass.getName()); - description.addParameter(HIVE_VERTEX_INPUT.getProfileIdOpt().getKey(), - "vertex_input_profile_" + vertexInputDescriptions.size()); - description.addParameter( - HIVE_VERTEX_INPUT.getTableOpt().getKey(), tableName); - if (partitionFilter != null && !partitionFilter.isEmpty()) { - description.addParameter( - HIVE_VERTEX_INPUT.getPartitionOpt().getKey(), partitionFilter); - } - addAdditionalOptions(description, additionalOptions); - vertexInputDescriptions.add(description); - } - - public List<EdgeInputFormatDescription> getEdgeInputDescriptions() { - return edgeInputDescriptions; - } - - /** - * Whether to use edge input. - * - * @return true if edge input enabled (at least one HiveToEdge is set). - */ - public boolean hasEdgeInput() { - return !edgeInputDescriptions.isEmpty(); - } - - /** - * Add edge input - * - * @param hiveToEdgeClass HiveToEdge class to use - * @param tableName Table name - * @param partitionFilter Partition filter, or null if no filter used - * @param additionalOptions Additional options, in the form "option=value" - */ - public void addEdgeInput(Class<? extends HiveToEdge> hiveToEdgeClass, - String tableName, String partitionFilter, String ... additionalOptions) { - EdgeInputFormatDescription description = - new EdgeInputFormatDescription(HiveEdgeInputFormat.class); - description.addParameter( - HIVE_EDGE_INPUT.getClassOpt().getKey(), hiveToEdgeClass.getName()); - description.addParameter(HIVE_EDGE_INPUT.getProfileIdOpt().getKey(), - "edge_input_profile_" + edgeInputDescriptions.size()); - description.addParameter( - HIVE_EDGE_INPUT.getTableOpt().getKey(), tableName); - if (partitionFilter != null && !partitionFilter.isEmpty()) { - description.addParameter( - HIVE_EDGE_INPUT.getPartitionOpt().getKey(), partitionFilter); - } - addAdditionalOptions(description, additionalOptions); - edgeInputDescriptions.add(description); - } - - /** - * Add additional options to InputFormatDescription - * - * @param description InputFormatDescription - * @param additionalOptions Additional options - */ - private static void addAdditionalOptions(InputFormatDescription description, - String ... additionalOptions) { - for (String additionalOption : additionalOptions) { - String[] nameValue = split(additionalOption, "="); - if (nameValue.length != 2) { - throw new IllegalStateException("Invalid additional option format " + - additionalOption + ", 'name=value' format expected"); - } - description.addParameter(nameValue[0], nameValue[1]); - } - } - - public Class<? extends VertexToHive> getVertexToHiveClass() { - return vertexToHiveClass; - } - - /** - * Whether we are writing vertices out. - * - * @return true if vertex output enabled - */ - public boolean hasVertexOutput() { - return !skipOutput && vertexToHiveClass != null; - } - - /** - * Set vertex output - * - * @param vertexToHiveClass class for writing vertices to Hive. - * @param tableName Table name - * @param partitionFilter Partition filter, or null if no filter used - */ - public void setVertexOutput( - Class<? extends VertexToHive> vertexToHiveClass, String tableName, - String partitionFilter) { - this.vertexToHiveClass = vertexToHiveClass; - VERTEX_TO_HIVE_CLASS.set(conf, vertexToHiveClass); - HIVE_VERTEX_OUTPUT_PROFILE_ID.set(conf, "vertex_output_profile"); - HIVE_VERTEX_OUTPUT_TABLE.set(conf, tableName); - if (partitionFilter != null) { - HIVE_VERTEX_OUTPUT_PARTITION.set(conf, - // People often put quotes around partition values by mistake, - // and it's invalid to have it, so remove all quotes from - // partitionFilter - partitionFilter.replaceAll("'", "")); - } - } - - /** - * Check if mapping input is set - * - * @return true if mapping input is set - */ - public boolean hasMappingInput() { - return hiveToMappingClass != null; - } - - /** - * Set mapping input - * - * @param hiveToMappingClass class for reading mapping entries from Hive. - * @param tableName Table name - * @param partitionFilter Partition filter, or null if no filter used - */ - public void setMappingInput( - Class<? extends HiveToMapping> hiveToMappingClass, String tableName, - String partitionFilter) { - this.hiveToMappingClass = hiveToMappingClass; - conf.set(HIVE_MAPPING_INPUT.getClassOpt().getKey(), - hiveToMappingClass.getName()); - conf.set(HIVE_MAPPING_INPUT.getProfileIdOpt().getKey(), - "mapping_input_profile"); - conf.set(HIVE_MAPPING_INPUT.getTableOpt().getKey(), tableName); - if (partitionFilter != null) { - conf.set(HIVE_MAPPING_INPUT.getPartitionOpt().getKey(), partitionFilter); - } - } - - /** - * main method - * @param args system arguments - * @throws Exception any errors from Hive Giraph Runner - */ - public static void main(String[] args) throws Exception { - HiveGiraphRunner runner = new HiveGiraphRunner(); - System.exit(ToolRunner.run(runner, args)); - } - - @Override - public final int run(String[] args) throws Exception { - // process args - try { - handleCommandLine(args); - } catch (InterruptedException e) { - return 0; - } catch (IllegalArgumentException e) { - System.err.println(e.getMessage()); - return -1; - } - - // additional configuration for Hive - HiveUtils.addHadoopClasspathToTmpJars(conf); - HiveUtils.addHiveSiteXmlToTmpFiles(conf); - - // setup GiraphJob - GiraphJob job = new GiraphJob(getConf(), getClass().getName()); - GiraphConfiguration giraphConf = job.getConfiguration(); - giraphConf.setComputationClass(computationClass); - - giraphConf.setWorkerConfiguration(workers, workers, 100.0f); - initGiraphJob(job); - - logOptions(giraphConf); - - return job.run(isVerbose) ? 0 : -1; - } - - /** - * Create ImmutableClassesGiraphConfiguration from provided Configuration - * which is going to copy all the values set to it to this original - * Configuration - * - * @param conf Configuration to create ImmutableClassesGiraphConfiguration - * from and update with any changes to the returned configuration - * @return ImmutableClassesGiraphConfiguration - */ - private ImmutableClassesGiraphConfiguration createGiraphConf( - final Configuration conf) { - return new ImmutableClassesGiraphConfiguration(conf) { - @Override - public void set(String name, String value) { - super.set(name, value); - conf.set(name, value); - } - }; - } - - /** - * Create ImmutableClassesGiraphConfiguration from provided Configuration - * which is going to copy all the values set to it to provided - * InputFormatDescription - * - * @param conf Configuration to create ImmutableClassesGiraphConfiguration - * from - * @param inputFormatDescription InputFormatDescription to update with any - * changes to the returned configuration - * @return ImmutableClassesGiraphConfiguration - */ - private ImmutableClassesGiraphConfiguration createGiraphConf( - Configuration conf, - final InputFormatDescription inputFormatDescription) { - return new ImmutableClassesGiraphConfiguration(conf) { - @Override - public void set(String name, String value) { - super.set(name, value); - inputFormatDescription.addParameter(name, value); - } - }; - } - - /** - * Prepare vertex input settings in Configuration. - * - * For all Hive vertex inputs, add the user settings to the configuration. - * Additionally, this checks the input specs for every input and caches - * metadata information into the configuration to eliminate worker access to - * the metastore and fail earlier in the case that metadata doesn't exist. - * In the case of multiple vertex input descriptions, metadata is cached in - * each vertex input format description and then saved into a single - * Configuration via JSON. - */ - @SuppressWarnings("unchecked") - public void prepareHiveVertexInputs() { - if (vertexInputDescriptions.size() == 1) { - GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf, - vertexInputDescriptions.get(0).getInputFormatClass()); - vertexInputDescriptions.get(0).putParametersToConfiguration(conf); - // Create VertexInputFormat in order to initialize the Configuration with - // data from metastore, and check it - createGiraphConf(conf).createWrappedVertexInputFormat() - .checkInputSpecs(conf); - } else { - // For each of the VertexInputFormats we'll prepare Configuration - // parameters - for (int i = 0; i < vertexInputDescriptions.size(); i++) { - // Create a copy of the Configuration in order not to mess up the - // original one - Configuration confCopy = new Configuration(conf); - final VertexInputFormatDescription vertexInputDescription = - vertexInputDescriptions.get(i); - GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(confCopy, - vertexInputDescription.getInputFormatClass()); - vertexInputDescription.putParametersToConfiguration(confCopy); - // Create VertexInputFormat in order to initialize its description with - // data from metastore, and check it - createGiraphConf(confCopy, vertexInputDescription) - .createWrappedVertexInputFormat().checkInputSpecs(confCopy); - } - GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf, - MultiVertexInputFormat.class); - VertexInputFormatDescription.VERTEX_INPUT_FORMAT_DESCRIPTIONS.set(conf, - InputFormatDescription.toJsonString(vertexInputDescriptions)); - } - } - - /** - * Prepare edge input settings in Configuration. - * - * For all Hive edge inputs, add the user settings to the configuration. - * Additionally, this checks the input specs for every input and caches - * metadata information into the configuration to eliminate worker access to - * the metastore and fail earlier in the case that metadata doesn't exist. - * In the case of multiple edge input descriptions, metadata is cached in each - * vertex input format description and then saved into a single - * Configuration via JSON. - */ - @SuppressWarnings("unchecked") - public void prepareHiveEdgeInputs() { - if (edgeInputDescriptions.size() == 1) { - GiraphConstants.EDGE_INPUT_FORMAT_CLASS.set(conf, - edgeInputDescriptions.get(0).getInputFormatClass()); - edgeInputDescriptions.get(0).putParametersToConfiguration(conf); - // Create EdgeInputFormat in order to initialize the Configuration with - // data from metastore, and check it - createGiraphConf(conf).createWrappedEdgeInputFormat() - .checkInputSpecs(conf); - } else { - // For each of the EdgeInputFormats we'll prepare Configuration - // parameters - for (int i = 0; i < edgeInputDescriptions.size(); i++) { - // Create a copy of the Configuration in order not to mess up the - // original one - Configuration confCopy = new Configuration(conf); - final EdgeInputFormatDescription edgeInputDescription = - edgeInputDescriptions.get(i); - GiraphConstants.EDGE_INPUT_FORMAT_CLASS.set(confCopy, - edgeInputDescription.getInputFormatClass()); - edgeInputDescription.putParametersToConfiguration(confCopy); - // Create EdgeInputFormat in order to initialize its description with - // data from metastore, and check it - createGiraphConf(confCopy, edgeInputDescription) - .createWrappedEdgeInputFormat().checkInputSpecs(confCopy); - } - GiraphConstants.EDGE_INPUT_FORMAT_CLASS.set(conf, - MultiEdgeInputFormat.class); - EdgeInputFormatDescription.EDGE_INPUT_FORMAT_DESCRIPTIONS.set(conf, - InputFormatDescription.toJsonString(edgeInputDescriptions)); - } - } - - /** - * Prepare output settings in Configuration. - * - * This caches metadata information into the configuration to eliminate worker - * access to the metastore. - */ - public void prepareHiveOutput() { - GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS.set(conf, - HiveVertexOutputFormat.class); - // Output format will be checked by Hadoop, here we only create it in - // order to initialize the Configuration with data from metastore. - // Can't check it here since we don't have JobContext yet - createGiraphConf(conf).createWrappedVertexOutputFormat(); - } - - /** - * Prepare input settings in Configuration - * - * This caches metadata information into the configuration to eliminate worker - * access to the metastore. - */ - public void prepareHiveMappingInput() { - GiraphConstants.MAPPING_INPUT_FORMAT_CLASS.set(conf, - HiveMappingInputFormat.class); - - Configuration confCopy = new Configuration(conf); - createGiraphConf(confCopy) - .createWrappedMappingInputFormat() - .checkInputSpecs(confCopy); - } - - /** - * process arguments - * @param args to process - * @return CommandLine instance - * @throws org.apache.commons.cli.ParseException error parsing arguments - * @throws InterruptedException interrupted - */ - private CommandLine handleCommandLine(String[] args) throws ParseException, - InterruptedException { - Options options = new Options(); - addOptions(options); - addMoreOptions(options); - - CommandLineParser parser = new GnuParser(); - final CommandLine cmdln = parser.parse(options, args); - if (args.length == 0 || cmdln.hasOption("help")) { - new HelpFormatter().printHelp(getClass().getName(), options, true); - throw new InterruptedException(); - } - - // pick up -hiveconf arguments (put -D arguments from command line to conf) - HiveUtils.processHiveconfOptions(cmdln.getOptionValues("hiveconf"), conf); - - // Giraph classes - String computationClassStr = cmdln.getOptionValue("computationClass"); - if (computationClassStr != null) { - computationClass = findClass(computationClassStr, Computation.class); - } - if (computationClass == null) { - throw new IllegalArgumentException( - "Need the Giraph " + Computation.class.getSimpleName() + - " class name (-computationClass) to use"); - } - - String mappingInput = cmdln.getOptionValue("mappingInput"); - if (mappingInput != null) { - String[] parameters = split(mappingInput, ",", 3); - if (parameters.length < 2) { - throw new IllegalStateException("Illegal mappingInput description " + - mappingInput + " - HiveToMapping class and table name needed"); - } - setMappingInput(findClass(parameters[0], HiveToMapping.class), - parameters[1], elementOrNull(parameters, 2)); - } - - String[] vertexInputs = cmdln.getOptionValues("vertexInput"); - if (vertexInputs != null && vertexInputs.length != 0) { - vertexInputDescriptions.clear(); - for (String vertexInput : vertexInputs) { - String[] parameters = split(vertexInput, ","); - if (parameters.length < 2) { - throw new IllegalStateException("Illegal vertex input description " + - vertexInput + " - HiveToVertex class and table name needed"); - } - addVertexInput(findClass(parameters[0], HiveToVertex.class), - parameters[1], elementOrNull(parameters, 2), - copyOfArray(parameters, 3)); - } - } - - String[] edgeInputs = cmdln.getOptionValues("edgeInput"); - if (edgeInputs != null && edgeInputs.length != 0) { - edgeInputDescriptions.clear(); - for (String edgeInput : edgeInputs) { - String[] parameters = split(edgeInput, ","); - if (parameters.length < 2) { - throw new IllegalStateException("Illegal edge input description " + - edgeInput + " - HiveToEdge class and table name needed"); - } - addEdgeInput(findClass(parameters[0], HiveToEdge.class), - parameters[1], elementOrNull(parameters, 2), - copyOfArray(parameters, 3)); - } - } - - String output = cmdln.getOptionValue("output"); - if (output != null) { - // Partition filter can contain commas so we limit the number of times - // we split - String[] parameters = split(output, ",", 3); - if (parameters.length < 2) { - throw new IllegalStateException("Illegal output description " + - output + " - VertexToHive class and table name needed"); - } - setVertexOutput(findClass(parameters[0], VertexToHive.class), - parameters[1], elementOrNull(parameters, 2)); - } - - if (cmdln.hasOption("skipOutput")) { - skipOutput = true; - } - - if (!hasVertexInput() && !hasEdgeInput()) { - throw new IllegalArgumentException( - "Need at least one of Giraph " + - HiveToVertex.class.getSimpleName() + - " (-vertexInput) and " + - HiveToEdge.class.getSimpleName() + - " (-edgeInput)"); - } - if (vertexToHiveClass == null && !skipOutput) { - throw new IllegalArgumentException( - "Need the Giraph " + VertexToHive.class.getSimpleName() + - " (-output) to use"); - } - String workersStr = cmdln.getOptionValue("workers"); - if (workersStr == null) { - throw new IllegalArgumentException( - "Need to choose the number of workers (-w)"); - } - - String dbName = cmdln.getOptionValue("dbName", "default"); - - workers = Integer.parseInt(workersStr); - - isVerbose = cmdln.hasOption("verbose"); - - // Processing more arguments should precede Hive preparation to - // allow metastore changes (i.e. creating tables that don't exist) - processMoreArguments(cmdln); - - if (mappingInput != null) { // mapping input is provided - HIVE_MAPPING_INPUT.getDatabaseOpt().set(conf, dbName); - prepareHiveMappingInput(); - } - - if (hasVertexInput()) { - HIVE_VERTEX_INPUT.getDatabaseOpt().set(conf, dbName); - prepareHiveVertexInputs(); - } - - if (hasEdgeInput()) { - HIVE_EDGE_INPUT.getDatabaseOpt().set(conf, dbName); - prepareHiveEdgeInputs(); - } - - if (!skipOutput) { - HIVE_VERTEX_OUTPUT_DATABASE.set(conf, dbName); - prepareHiveOutput(); - } else { - LOG.warn("run: Warning - Output will be skipped!"); - } - - return cmdln; - } - - /** - * Add hive-related options to command line parser options - * - * @param options Options to use - */ - private void addOptions(Options options) { - options.addOption("h", "help", false, "Help"); - options.addOption("v", "verbose", false, "Verbose"); - options.addOption("D", "hiveconf", true, - "property=value for Hive/Hadoop configuration"); - options.addOption("w", "workers", true, "Number of workers"); - - if (computationClass == null) { - options.addOption(null, "computationClass", true, - "Giraph Computation class to use"); - } - - options.addOption("db", "dbName", true, "Hive database name"); - - // Mapping input settings - options.addOption("mi", "mappingInput", true, "Giraph " + - HiveToMapping.class.getSimpleName() + " class to use, table name and " + - "partition filter (optional). Example:\n" + - "\"MyHiveToMapping, myTableName, a=1,b=two"); - - // Vertex input settings - options.addOption("vi", "vertexInput", true, getInputOptionDescription( - "vertex", HiveToVertex.class.getSimpleName())); - - // Edge input settings - options.addOption("ei", "edgeInput", true, getInputOptionDescription( - "edge", HiveToEdge.class.getSimpleName())); - - // Vertex output settings - options.addOption("o", "output", true, - "Giraph " + VertexToHive.class.getSimpleName() + " class to use," + - " table name and partition filter (optional). Example:\n" + - "\"MyVertexToHive, myTableName, a=1,b=two\""); - options.addOption("s", "skipOutput", false, "Skip output?"); - } - - /** - * Get description for the input format option (vertex or edge). - * - * @param inputType Type of input (vertex or edge) - * @param hiveToObjectClassName HiveToVertex or HiveToEdge - * @return Description for the input format option - */ - private static String getInputOptionDescription(String inputType, - String hiveToObjectClassName) { - StringBuilder inputOption = new StringBuilder(); - inputOption.append("Giraph ").append(hiveToObjectClassName).append( - " class to use, table name and partition filter (optional)."); - inputOption.append(" Additional options for the input format can be " + - "specified as well."); - inputOption.append(" You can set as many ").append(inputType).append( - " inputs as you like."); - inputOption.append(" Example:\n"); - inputOption.append("\"My").append(hiveToObjectClassName).append( - ", myTableName, a<2 AND b='two', option1=value1, option2=value2\""); - return inputOption.toString(); - } - - /** - * - * @param className to find - * @param base base class - * @param <T> class type found - * @return type found - */ - private <T> Class<? extends T> findClass(String className, Class<T> base) { - try { - Class<?> cls = Class.forName(className); - if (base.isAssignableFrom(cls)) { - return cls.asSubclass(base); - } - return null; - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException(className + ": Invalid class name"); - } - } - - @Override - public final Configuration getConf() { - return conf; - } - - @Override - public final void setConf(Configuration conf) { - this.conf = new GiraphConfiguration(conf); - } - - /** - * Override this method to add more command-line options. You can process - * them by also overriding {@link #processMoreArguments(CommandLine)}. - * - * @param options Options - */ - protected void addMoreOptions(Options options) { - } - - /** - * Override this method to process additional command-line arguments. You - * may want to declare additional options by also overriding - * {@link #addMoreOptions(org.apache.commons.cli.Options)}. - * - * @param cmd Command - */ - protected void processMoreArguments(CommandLine cmd) { - } - - /** - * Override this method to do additional setup with the GiraphJob that will - * run. - * - * @param job GiraphJob that is going to run - */ - protected void initGiraphJob(GiraphJob job) { } - - /** - * Log the options set by user - * - * @param giraphConf GiraphConfiguration - */ - private void logOptions(GiraphConfiguration giraphConf) { - LOG.info(getClass().getSimpleName() + " with"); - - LOG.info(LOG_PREFIX + "-computationClass=" + - computationClass.getCanonicalName()); - - for (VertexInputFormatDescription description : vertexInputDescriptions) { - LOG.info(LOG_PREFIX + "Vertex input: " + description); - } - - for (EdgeInputFormatDescription description : edgeInputDescriptions) { - LOG.info(LOG_PREFIX + "Edge input: " + description); - } - - if (GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS.contains(giraphConf)) { - LOG.info(LOG_PREFIX + "Output: VertexToHive=" + - vertexToHiveClass.getCanonicalName() + ", table=" + - HIVE_VERTEX_OUTPUT_TABLE.get(conf) + ", partition=\"" + - HIVE_VERTEX_OUTPUT_PARTITION.get(conf) + "\""); - } - - LOG.info(LOG_PREFIX + "-workers=" + workers); - } - - /** - * Split a string using separator and trim the results - * - * @param stringToSplit String to split - * @param separator Separator - * @return Separated strings, trimmed - */ - private static String[] split(String stringToSplit, String separator) { - return split(stringToSplit, separator, -1); - } - - /** - * Split a string using separator and trim the results - * - * @param stringToSplit String to split - * @param separator Separator - * @param limit See {@link String#split(String, int)} - * @return Separated strings, trimmed - */ - private static String[] split(String stringToSplit, String separator, - int limit) { - Splitter splitter = Splitter.on(separator).trimResults(); - if (limit > 0) { - splitter = splitter.limit(limit); - } - return Iterables.toArray(splitter.split(stringToSplit), String.class); - } - - /** - * Get the element in array at certain position, or null if the position is - * out of array size - * - * @param array Array - * @param position Position - * @return Element at the position or null if the position is out of array - */ - private static String elementOrNull(String[] array, int position) { - return (position < array.length) ? array[position] : null; - } - - /** - * Return a copy of array from some position to the end, - * or empty array if startIndex is out of array size - * - * @param array Array to take a copy from - * @param startIndex Starting position - * @return Copy of part of the array - */ - private static String[] copyOfArray(String[] array, int startIndex) { - if (array.length <= startIndex) { - return new String[0]; - } else { - return Arrays.copyOfRange(array, startIndex, array.length); - } - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/column/HiveReadableColumn.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/column/HiveReadableColumn.java b/giraph-hive/src/main/java/org/apache/giraph/hive/column/HiveReadableColumn.java deleted file mode 100644 index 93bd2c2..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/column/HiveReadableColumn.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.column; - -import com.facebook.hiveio.common.HiveType; -import com.facebook.hiveio.record.HiveReadableRecord; - -import java.util.List; -import java.util.Map; - -/** - * A single column from a Hive record. - */ -public class HiveReadableColumn { - /** The Hive record */ - private HiveReadableRecord record; - /** The column index to use */ - private int index; - - public int getIndex() { - return index; - } - - public void setIndex(int index) { - this.index = index; - } - - public HiveReadableRecord getRecord() { - return record; - } - - public void setRecord(HiveReadableRecord record) { - this.record = record; - } - - public boolean isNull() { - return record.isNull(index); - } - - /** - * Get type for this column - * - * @return {@link HiveType} - */ - public HiveType hiveType() { - return record.columnType(index); - } - - /** - * Get column value - * - * Regular data columns from the tables should always be placed first, and - * then partition value columns. - * - * If you know the type of the column and it is a primitive you should use - * one of the calls below as it will likely be more efficient. - * - * @return Object for column - * @deprecated use {@link #get(com.facebook.hiveio.common.HiveType)} - * or one of the getX() methods - */ - @Deprecated - public Object get() { - return record.get(index); - } - - /** - * Get column value - * - * Regular data columns from the tables should always be placed first, and - * then partition value columns. - * - * You should probably be using one of getX() methods below instead. - * - * @param hiveType HiveType - * @return Object for column - */ - public Object get(HiveType hiveType) { - return record.get(index, hiveType); - } - - /** - * Get boolean value - * - * @return boolean at index - */ - public boolean getBoolean() { - return record.getBoolean(index); - } - - /** - * Get byte value - * - * @return byte at index - */ - public byte getByte() { - return record.getByte(index); - } - - /** - * Get short value - * - * @return short at index - */ - public short getShort() { - return record.getShort(index); - } - - /** - * Get int value - * - * @return int at index - */ - public int getInt() { - return record.getInt(index); - } - - /** - * Get long value - * - * @return long at index - */ - public long getLong() { - return record.getLong(index); - } - - /** - * Get float value - * - * @return float at index - */ - public float getFloat() { - return record.getFloat(index); - } - - /** - * Get double value - * - * @return double at index - */ - public double getDouble() { - return record.getDouble(index); - } - - /** - * Get String column value - * Note that partition values are all strings. - * - * @return String at index - */ - public String getString() { - return record.getString(index); - } - - /** - * Get List column value - * Note that partition values are all strings. - * - * @param <T> item type - * @return List at index - */ - public <T> List<T> getList() { - return record.getList(index); - } - - /** - * Get Map column value - * Note that partition values are all strings. - * - * @param <K> key type - * @param <V> value type - * @return Map at index - */ - public <K, V> Map<K, V> getMap() { - return record.getMap(index); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/column/HiveWritableColumn.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/column/HiveWritableColumn.java b/giraph-hive/src/main/java/org/apache/giraph/hive/column/HiveWritableColumn.java deleted file mode 100644 index 5763126..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/column/HiveWritableColumn.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.column; - -import com.facebook.hiveio.common.HiveType; -import com.facebook.hiveio.record.HiveWritableRecord; - -import java.util.List; -import java.util.Map; - -/** - * A single column to write to a Hive record. - */ -public class HiveWritableColumn { - /** The Hive record */ - private HiveWritableRecord record; - /** The column index to use */ - private int index; - - public int getIndex() { - return index; - } - - public void setIndex(int index) { - this.index = index; - } - - public HiveWritableRecord getRecord() { - return record; - } - - public void setRecord(HiveWritableRecord record) { - this.record = record; - } - - /** - * Set value for column. - * - * @param value Data for column - * @deprecated - * use {@link #set(Object, com.facebook.hiveio.common.HiveType)} - * or one of the setX() methods - */ - @Deprecated - public void set(Object value) { - record.set(index, value); - } - - /** - * Set value with type for column. - * - * @param value data for column - * @param hiveType expected hive type - */ - public void set(Object value, HiveType hiveType) { - record.set(index, value, hiveType); - } - - /** - * Set boolean value for column. - * - * @param value Data for column - */ - public void setBoolean(boolean value) { - record.setBoolean(index, value); - } - - /** - * Set byte value for column. - * - * @param value Data for column - */ - public void setByte(byte value) { - record.setByte(index, value); - } - - /** - * Set short value for column. - * - * @param value Data for column - */ - public void setShort(short value) { - record.setShort(index, value); - } - - /** - * Set int value for column. - * - * @param value Data for column - */ - public void setInt(int value) { - record.setInt(index, value); - } - - /** - * Set long value for column. - * - * @param value Data for column - */ - public void setLong(long value) { - record.setLong(index, value); - } - - /** - * Set float value for column. - * - * @param value Data for column - */ - public void setFloat(float value) { - record.setFloat(index, value); - } - - /** - * Set double value for column. - * - * @param value Data for column - */ - public void setDouble(double value) { - record.setDouble(index, value); - } - - /** - * Set double value for column. - * - * @param value Data for column - */ - public void setString(String value) { - record.setString(index, value); - } - - /** - * Set List value for column. - * - * @param data Data for column - */ - public void setList(List data) { - record.setList(index, data); - } - - /** - * Set Map value for column. - * - * @param data Data for column - */ - public void setMap(Map data) { - record.setMap(index, data); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/column/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/column/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/column/package-info.java deleted file mode 100644 index 2077c4f..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/column/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Package of Hive per-column IO related things. - */ -package org.apache.giraph.hive.column; http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java deleted file mode 100644 index cfc696c..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.common; - -import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import com.facebook.hiveio.schema.HiveTableSchema; -import com.facebook.hiveio.schema.HiveTableSchemaAware; - -/** - * Default implementation of {@link HiveTableSchemaAware} and - * {@link org.apache.giraph.conf.ImmutableClassesGiraphConfigurable} - * - * @param <I> Vertex ID - * @param <V> Vertex Value - * @param <E> Edge Value - */ -public class DefaultConfigurableAndTableSchemaAware< - I extends WritableComparable, V extends Writable, E extends Writable> - extends DefaultImmutableClassesGiraphConfigurable<I, V, E> - implements HiveTableSchemaAware { - /** Schema stored here */ - private HiveTableSchema tableSchema; - - @Override public void setTableSchema(HiveTableSchema tableSchema) { - this.tableSchema = tableSchema; - } - - @Override public HiveTableSchema getTableSchema() { - return tableSchema; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java deleted file mode 100644 index ab533a2..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.common; - -import org.apache.giraph.conf.ClassConfOption; -import org.apache.giraph.conf.StrConfOption; -import org.apache.giraph.hive.input.mapping.HiveToMapping; -import org.apache.giraph.hive.input.edge.HiveToEdge; -import org.apache.giraph.hive.input.vertex.HiveToVertex; -import org.apache.giraph.hive.output.VertexToHive; - -/** - * Constants for giraph-hive - */ -public class GiraphHiveConstants { - /** Options for configuring mapping input */ - public static final HiveInputOptions<HiveToMapping> HIVE_MAPPING_INPUT = - new HiveInputOptions<>("mapping", HiveToMapping.class); - /** Options for configuring vertex input */ - public static final HiveInputOptions<HiveToVertex> HIVE_VERTEX_INPUT = - new HiveInputOptions<HiveToVertex>("vertex", HiveToVertex.class); - /** Options for configuring edge input */ - public static final HiveInputOptions<HiveToEdge> HIVE_EDGE_INPUT = - new HiveInputOptions<HiveToEdge>("edge", HiveToEdge.class); - - /** Class for converting vertices to Hive records */ - public static final ClassConfOption<VertexToHive> VERTEX_TO_HIVE_CLASS = - ClassConfOption.create("giraph.vertex.to.hive.class", null, - VertexToHive.class, - "Class for converting vertices to Hive records"); - /** Vertex output profile id */ - public static final StrConfOption HIVE_VERTEX_OUTPUT_PROFILE_ID = - new StrConfOption("giraph.hive.output.vertex.profileId", "vertex_output", - "Vertex output profile id"); - /** Vertex output database name */ - public static final StrConfOption HIVE_VERTEX_OUTPUT_DATABASE = - new StrConfOption("giraph.hive.output.vertex.database", "default", - "Vertex output database name"); - /** Vertex output table name */ - public static final StrConfOption HIVE_VERTEX_OUTPUT_TABLE = - new StrConfOption("giraph.hive.output.vertex.table", "", - "Vertex output table name"); - /** Vertex output partition */ - public static final StrConfOption HIVE_VERTEX_OUTPUT_PARTITION = - new StrConfOption("giraph.hive.output.vertex.partition", "", - "Vertex output partition"); - - /** Vertex ID hive reader */ - public static final StrConfOption VERTEX_ID_READER_JYTHON_NAME = - new StrConfOption("giraph.hive.jython.vertex.id.reader", null, - "Vertex ID hive reader"); - /** Vertex ID hive writer */ - public static final StrConfOption VERTEX_ID_WRITER_JYTHON_NAME = - new StrConfOption("giraph.hive.jython.vertex.id.writer", null, - "Vertex ID hive writer"); - /** Vertex value hive reader */ - public static final StrConfOption VERTEX_VALUE_READER_JYTHON_NAME = - new StrConfOption("giraph.hive.jython.vertex.value.reader", null, - "Vertex value hive reader"); - /** Vertex value hive writer */ - public static final StrConfOption VERTEX_VALUE_WRITER_JYTHON_NAME = - new StrConfOption("giraph.hive.jython.vertex.value.writer", null, - "Vertex value hive writer"); - /** Edge value hive reader */ - public static final StrConfOption EDGE_VALUE_READER_JYTHON_NAME = - new StrConfOption("giraph.hive.jython.edge.value.reader", null, - "Edge value hive reader"); - - /** Don't construct */ - protected GiraphHiveConstants() { } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveInputOptions.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveInputOptions.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveInputOptions.java deleted file mode 100644 index a6993dd..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveInputOptions.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.common; - -import org.apache.giraph.conf.ClassConfOption; -import org.apache.giraph.conf.IntConfOption; -import org.apache.giraph.conf.StrConfOption; -import org.apache.hadoop.conf.Configuration; - -import com.facebook.hiveio.input.HiveInputDescription; - -/** - * Holder for Hive Input Configuration options. Used for vertex and edge input. - * @param <C> {@link org.apache.giraph.hive.input.edge.HiveToEdge} or - * {@link org.apache.giraph.hive.input.vertex.HiveToVertex} - */ -public class HiveInputOptions<C> { - /** Class for converting hive records */ - private final ClassConfOption<C> classOpt; - /** Input profile id */ - private final StrConfOption profileIdOpt; - /** Number of splits */ - private final IntConfOption splitsOpt; - /** Input database name */ - private final StrConfOption databaseOpt; - /** Input table name */ - private final StrConfOption tableOpt; - /** Input partition filter */ - private final StrConfOption partitionOpt; - /** Hive Metastore host to use. If blank will infer from HiveConf */ - private final StrConfOption hostOpt; - /** Hive Metastore port to use. */ - private final IntConfOption portOpt; - - /** - * Constructor - * @param name "vertex" or "edge" - * @param hiveToTypeClass HiveToVertex or HiveToEdge - */ - public HiveInputOptions(String name, Class<C> hiveToTypeClass) { - classOpt = ClassConfOption.<C>create(key(name, "class"), - null, hiveToTypeClass, "Class for converting hive records"); - profileIdOpt = new StrConfOption(key(name, "profileId"), - name + "_input_profile", "Input profile id"); - partitionOpt = new StrConfOption(key(name, "partition"), "", - "Input partition filter"); - splitsOpt = new IntConfOption(key(name, "splits"), 0, "Number of splits"); - databaseOpt = new StrConfOption(key(name, "database"), "default", - "Input database name"); - tableOpt = new StrConfOption(key(name, "table"), "", "Input table name"); - hostOpt = new StrConfOption(key(name, "metastore.host"), null, - "Hive Metastore host to use. If blank will infer from HiveConf"); - portOpt = new IntConfOption(key(name, "metastore.port"), 9083, - "Hive Metastore port to use."); - } - - /** - * Create Configuration key from name and suffix - * @param name the name - * @param suffix the suffix - * @return key - */ - private static String key(String name, String suffix) { - return "giraph.hive.input." + name + "." + suffix; - } - - /** - * Get profile ID from Configuration - * @param conf Configuration - * @return profile ID - */ - public String getProfileID(Configuration conf) { - return profileIdOpt.get(conf); - } - - /** - * Set HiveToX class to use - * @param conf Configuraton - * @param hiveToTypeClass class to use - */ - public void setClass(Configuration conf, Class<? extends C> hiveToTypeClass) { - classOpt.set(conf, hiveToTypeClass); - } - - /** - * Set Database to use - * @param conf Configuration - * @param dbName database - */ - public void setDatabase(Configuration conf, String dbName) { - databaseOpt.set(conf, dbName); - } - - /** - * Set Table to use - * @param conf Configuration - * @param tableName table - */ - public void setTable(Configuration conf, String tableName) { - tableOpt.set(conf, tableName); - } - - /** - * Set partition filter to use - * @param conf Configuration - * @param partitionFilter partition filter - */ - public void setPartition(Configuration conf, String partitionFilter) { - partitionOpt.set(conf, partitionFilter); - } - - /** - * Get HiveToX class set in Configuration - * @param conf Configuration - * @return HiveToX - */ - public Class<? extends C> getClass(Configuration conf) { - return classOpt.get(conf); - } - - public StrConfOption getDatabaseOpt() { - return databaseOpt; - } - - public StrConfOption getHostOpt() { - return hostOpt; - } - - public ClassConfOption<C> getClassOpt() { - return classOpt; - } - - public StrConfOption getPartitionOpt() { - return partitionOpt; - } - - public IntConfOption getPortOpt() { - return portOpt; - } - - public StrConfOption getProfileIdOpt() { - return profileIdOpt; - } - - public IntConfOption getSplitsOpt() { - return splitsOpt; - } - - public StrConfOption getTableOpt() { - return tableOpt; - } - - /** - * Create a HiveInputDescription from the options in the Configuration - * @param conf Configuration - * @return HiveInputDescription - */ - public HiveInputDescription makeInputDescription(Configuration conf) { - HiveInputDescription inputDescription = new HiveInputDescription(); - inputDescription.getTableDesc().setDatabaseName(databaseOpt.get(conf)); - inputDescription.getTableDesc().setTableName(tableOpt.get(conf)); - inputDescription.setPartitionFilter(partitionOpt.get(conf)); - inputDescription.setNumSplits(splitsOpt.get(conf)); - inputDescription.getMetastoreDesc().setHost(hostOpt.get(conf)); - inputDescription.getMetastoreDesc().setPort(portOpt.get(conf)); - return inputDescription; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java deleted file mode 100644 index 7ceba23..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveParsing.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.common; - -import org.apache.giraph.edge.Edge; -import org.apache.giraph.edge.EdgeFactory; -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; - -import com.facebook.hiveio.record.HiveReadableRecord; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -import java.util.List; -import java.util.Map; - -/** - * Helpers for parsing with Hive. - */ -public class HiveParsing { - /** Don't construct */ - private HiveParsing() { } - - /** - * Parse a byte from a Hive record - * @param record Hive record to parse - * @param columnIndex offset of column in row - * @return byte - */ - public static byte parseByte(HiveReadableRecord record, int columnIndex) { - return record.getByte(columnIndex); - } - - /** - * Parse a int from a Hive record - * @param record Hive record to parse - * @param columnIndex offset of column in row - * @return int - */ - public static int parseInt(HiveReadableRecord record, int columnIndex) { - return record.getInt(columnIndex); - } - - /** - * Parse a Integer ID from a Hive record - * @param record Hive record to parse - * @param columnIndex offset of column in row - * @param reusableId Reusable vertex id object - * @return IntWritable ID - */ - public static IntWritable parseIntID(HiveReadableRecord record, - int columnIndex, IntWritable reusableId) { - reusableId.set(parseInt(record, columnIndex)); - return reusableId; - } - - /** - * Parse a Long ID from a Hive record - * @param record Hive record to parse - * @param columnIndex offset of column in row - * @param reusableId Reusable vertex id object - * @return LongWritable ID - */ - public static LongWritable parseLongID(HiveReadableRecord record, - int columnIndex, LongWritable reusableId) { - reusableId.set(record.getLong(columnIndex)); - return reusableId; - } - - /** - * Parse a weight from a Hive record - * @param record Hive record to parse - * @param columnIndex offset of column in row - * @param reusableDoubleWritable Reusable DoubleWritable object - * - * @return DoubleWritable weight - */ - public static DoubleWritable parseDoubleWritable(HiveReadableRecord record, - int columnIndex, DoubleWritable reusableDoubleWritable) { - reusableDoubleWritable.set(record.getDouble(columnIndex)); - return reusableDoubleWritable; - } - - /** - * Parse edges as mappings of integer => double (id to weight) - * @param record Hive record to parse - * @param columnIndex offset of column in row - * @return edges - */ - @SuppressWarnings("unchecked") - public static Iterable<Edge<IntWritable, DoubleWritable>> parseIntDoubleEdges( - HiveReadableRecord record, int columnIndex) { - Object edgesObj = record.get(columnIndex); - if (edgesObj == null) { - return ImmutableList.of(); - } - Map<Long, Double> readEdges = (Map<Long, Double>) edgesObj; - List<Edge<IntWritable, DoubleWritable>> edges = - Lists.newArrayListWithCapacity(readEdges.size()); - for (Map.Entry<Long, Double> entry : readEdges.entrySet()) { - edges.add(EdgeFactory.create(new IntWritable(entry.getKey().intValue()), - new DoubleWritable(entry.getValue()))); - } - return edges; - } - - /** - * Parse edges from a list - * @param record hive record - * @param index column index - * @return iterable of edges - */ - public static Iterable<Edge<IntWritable, NullWritable>> parseIntNullEdges( - HiveReadableRecord record, int index) { - List<Long> ids = (List<Long>) record.get(index); - if (ids == null) { - return ImmutableList.of(); - } - ImmutableList.Builder<Edge<IntWritable, NullWritable>> builder = - ImmutableList.builder(); - for (long id : ids) { - builder.add(EdgeFactory.create(new IntWritable((int) id))); - } - return builder.build(); - } - - /** - * Parse edges as mappings of long => double (id to weight) - * @param record Hive record to parse - * @param columnIndex offset of column in row - * @return edges - */ - @SuppressWarnings("unchecked") - public static Iterable<Edge<LongWritable, DoubleWritable>> - parseLongDoubleEdges(HiveReadableRecord record, int columnIndex) { - Object edgesObj = record.get(columnIndex); - if (edgesObj == null) { - return ImmutableList.of(); - } - Map<Long, Double> readEdges = (Map<Long, Double>) edgesObj; - List<Edge<LongWritable, DoubleWritable>> edges = - Lists.newArrayListWithCapacity(readEdges.size()); - for (Map.Entry<Long, Double> entry : readEdges.entrySet()) { - edges.add(EdgeFactory.create(new LongWritable(entry.getKey()), - new DoubleWritable(entry.getValue()))); - } - return edges; - } -}
