http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java deleted file mode 100644 index 35d8b3e..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java +++ /dev/null @@ -1,394 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.common; - -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.conf.StrConfOption; -import org.apache.giraph.hive.input.mapping.HiveToMapping; -import org.apache.giraph.hive.input.edge.HiveToEdge; -import org.apache.giraph.hive.input.vertex.HiveToVertex; -import org.apache.giraph.hive.output.VertexToHive; -import org.apache.giraph.utils.ReflectionUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.log4j.Logger; - -import com.facebook.hiveio.schema.HiveTableSchema; -import com.facebook.hiveio.schema.HiveTableSchemas; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import java.io.File; -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -import static java.lang.System.getenv; -import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT; -import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_MAPPING_INPUT; -import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT; -import static org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_TO_HIVE_CLASS; - -/** - * Utility methods for Hive IO - */ -@SuppressWarnings("unchecked") -public class HiveUtils { - /** Logger */ - private static final Logger LOG = Logger.getLogger(HiveUtils.class); - - /** Do not instantiate */ - private HiveUtils() { - } - - /** - * @param outputTablePartitionString table partition string - * @return Map - */ - public static Map<String, String> parsePartitionValues( - String outputTablePartitionString) { - if (outputTablePartitionString == null) { - return null; - } - Splitter commaSplitter = Splitter.on(',').omitEmptyStrings().trimResults(); - Splitter equalSplitter = Splitter.on('=').omitEmptyStrings().trimResults(); - Map<String, String> partitionValues = Maps.newHashMap(); - for (String keyValStr : commaSplitter.split(outputTablePartitionString)) { - List<String> keyVal = Lists.newArrayList(equalSplitter.split(keyValStr)); - if (keyVal.size() != 2) { - throw new IllegalArgumentException( - "Unrecognized partition value format: " + - outputTablePartitionString); - } - partitionValues.put(keyVal.get(0), keyVal.get(1)); - } - return partitionValues; - } - - /** - * Lookup index of column in {@link HiveTableSchema}, or throw if not found. - * - * @param schema {@link HiveTableSchema} - * @param columnName column name - * @return column index - */ - public static int columnIndexOrThrow(HiveTableSchema schema, - String columnName) { - int index = schema.positionOf(columnName); - if (index == -1) { - throw new IllegalArgumentException("Column " + columnName + - " not found in table " + schema.getTableDesc()); - } - return index; - } - - /** - * Lookup index of column in {@link HiveTableSchema}, or throw if not found. - * - * @param schema {@link HiveTableSchema} - * @param conf {@link Configuration} - * @param confOption {@link StrConfOption} - * @return column index - */ - public static int columnIndexOrThrow(HiveTableSchema schema, - Configuration conf, StrConfOption confOption) { - String columnName = confOption.get(conf); - if (columnName == null) { - throw new IllegalArgumentException("Column " + confOption.getKey() + - " not set in configuration"); - } - return columnIndexOrThrow(schema, columnName); - } - - /** - * Add hive-site.xml file to tmpfiles in Configuration. - * - * @param conf Configuration - */ - public static void addHiveSiteXmlToTmpFiles(Configuration conf) { - // When output partitions are used, workers register them to the - // metastore at cleanup stage, and on HiveConf's initialization, it - // looks for hive-site.xml. - addToHiveFromClassLoader(conf, "hive-site.xml"); - } - - /** - * Add hive-site-custom.xml to tmpfiles in Configuration. - * - * @param conf Configuration - */ - public static void addHiveSiteCustomXmlToTmpFiles(Configuration conf) { - addToHiveFromClassLoader(conf, "hive-site-custom.xml"); - addToHiveFromEnv(conf, "HIVE_HOME", "conf/hive-site.xml"); - } - - /** - * Add a file to Configuration tmpfiles from environment variable - * - * @param conf Configuration - * @param envKey environment variable key - * @param path search path - * @return true if file found and added, false otherwise - */ - private static boolean addToHiveFromEnv(Configuration conf, - String envKey, String path) { - String envValue = getenv(envKey); - if (envValue == null) { - return false; - } - File file = new File(envValue, path); - if (file.exists()) { - LOG.info("addToHiveFromEnv: Adding " + file.getPath() + - " to Configuration tmpfiles"); - } - try { - addToStringCollection(conf, "tmpfiles", file.toURI().toURL().toString()); - } catch (MalformedURLException e) { - LOG.error("Failed to get URL for file " + file); - } - return true; - } - - /** - * Add a file to Configuration tmpfiles from ClassLoader resource - * - * @param conf Configuration - * @param name file name - * @return true if file found in class loader, false otherwise - */ - private static boolean addToHiveFromClassLoader(Configuration conf, - String name) { - URL url = conf.getClassLoader().getResource(name); - if (url == null) { - return false; - } - if (LOG.isInfoEnabled()) { - LOG.info("addToHiveFromClassLoader: Adding " + name + " at " + - url + " to Configuration tmpfiles"); - } - addToStringCollection(conf, "tmpfiles", url.toString()); - return true; - } - - /** - * Add jars from HADOOP_CLASSPATH environment variable to tmpjars property - * in Configuration. - * - * @param conf Configuration - */ - public static void addHadoopClasspathToTmpJars(Configuration conf) { - // Or, more effectively, we can provide all the jars client needed to - // the workers as well - String hadoopClasspath = getenv("HADOOP_CLASSPATH"); - if (hadoopClasspath == null) { - return; - } - String[] hadoopJars = hadoopClasspath.split(File.pathSeparator); - if (hadoopJars.length > 0) { - List<String> hadoopJarURLs = Lists.newArrayList(); - for (String jarPath : hadoopJars) { - File file = new File(jarPath); - if (file.exists() && file.isFile()) { - hadoopJarURLs.add(file.toURI().toString()); - } - } - HiveUtils.addToStringCollection(conf, "tmpjars", hadoopJarURLs); - } - } - - /** - * Handle -hiveconf options, adding them to Configuration - * - * @param hiveconfArgs array of hiveconf args - * @param conf Configuration - */ - public static void processHiveconfOptions(String[] hiveconfArgs, - Configuration conf) { - for (String hiveconf : hiveconfArgs) { - processHiveconfOption(conf, hiveconf); - } - } - - /** - * Process -hiveconf option, adding it to Configuration appropriately. - * - * @param conf Configuration - * @param hiveconf option to process - */ - public static void processHiveconfOption(Configuration conf, - String hiveconf) { - String[] keyval = hiveconf.split("=", 2); - if (keyval.length == 2) { - String name = keyval[0]; - String value = keyval[1]; - if (name.equals("tmpjars") || name.equals("tmpfiles")) { - addToStringCollection(conf, name, value); - } else { - conf.set(name, value); - } - } - } - - /** - * Add string to collection - * - * @param conf Configuration - * @param key key to add - * @param values values for collection - */ - public static void addToStringCollection(Configuration conf, String key, - String... values) { - addToStringCollection(conf, key, Arrays.asList(values)); - } - - /** - * Add string to collection - * - * @param conf Configuration - * @param key to add - * @param values values for collection - */ - public static void addToStringCollection( - Configuration conf, String key, Collection<String> values) { - Collection<String> strings = conf.getStringCollection(key); - strings.addAll(values); - conf.setStrings(key, strings.toArray(new String[strings.size()])); - } - - /** - * Create a new VertexToHive - * - * @param <I> Vertex ID - * @param <V> Vertex Value - * @param <E> Edge Value - * @param conf Configuration - * @param schema Hive table schema - * @return VertexToHive - * @throws IOException on any instantiation errors - */ - public static <I extends WritableComparable, V extends Writable, - E extends Writable> VertexToHive<I, V, E> newVertexToHive( - ImmutableClassesGiraphConfiguration<I, V, E> conf, - HiveTableSchema schema) throws IOException { - Class<? extends VertexToHive> klass = VERTEX_TO_HIVE_CLASS.get(conf); - if (klass == null) { - throw new IOException(VERTEX_TO_HIVE_CLASS.getKey() + - " not set in conf"); - } - return newInstance(klass, conf, schema); - } - - /** - * Create a new HiveToEdge - * - * @param <I> Vertex ID - * @param <V> Vertex Value - * @param <E> Edge Value - * @param conf Configuration - * @param schema Hive table schema - * @return HiveToVertex - */ - public static <I extends WritableComparable, V extends Writable, - E extends Writable> HiveToEdge<I, E> newHiveToEdge( - ImmutableClassesGiraphConfiguration<I, V, E> conf, - HiveTableSchema schema) { - Class<? extends HiveToEdge> klass = HIVE_EDGE_INPUT.getClass(conf); - if (klass == null) { - throw new IllegalArgumentException( - HIVE_EDGE_INPUT.getClassOpt().getKey() + " not set in conf"); - } - return newInstance(klass, conf, schema); - } - - /** - * Create a new HiveToVertex - * - * @param <I> Vertex ID - * @param <V> Vertex Value - * @param <E> Edge Value - * @param conf Configuration - * @param schema Hive table schema - * @return HiveToVertex - */ - public static <I extends WritableComparable, V extends Writable, - E extends Writable> HiveToVertex<I, V, E> newHiveToVertex( - ImmutableClassesGiraphConfiguration<I, V, E> conf, - HiveTableSchema schema) { - Class<? extends HiveToVertex> klass = HIVE_VERTEX_INPUT.getClass(conf); - if (klass == null) { - throw new IllegalArgumentException( - HIVE_VERTEX_INPUT.getClassOpt().getKey() + " not set in conf"); - } - return newInstance(klass, conf, schema); - } - - /** - * Create a new HiveToMapping - * - * @param conf ImmutableClassesGiraphConfiguration - * @param schema HiveTableSchema - * @param <I> vertexId type - * @param <V> vertexValue type - * @param <E> edgeValue type - * @param <B> mappingTarget type - * @return HiveToMapping - */ - public static <I extends WritableComparable, V extends Writable, - E extends Writable, B extends Writable> - HiveToMapping<I, B> newHiveToMapping( - ImmutableClassesGiraphConfiguration<I, V, E> conf, - HiveTableSchema schema) { - Class<? extends HiveToMapping> klass = HIVE_MAPPING_INPUT.getClass(conf); - if (klass == null) { - throw new IllegalArgumentException( - HIVE_MAPPING_INPUT.getClassOpt().getKey() + " not set in conf" - ); - } - return newInstance(klass, conf, schema); - } - - /** - * Create a new instance of a class, configuring it and setting the Hive table - * schema if it supports those types. - * - * @param klass Class to create - * @param conf {@link ImmutableClassesGiraphConfiguration} to configure with - * @param schema {@link HiveTableSchema} from Hive to set - * @param <I> Vertex ID - * @param <V> Vertex Value - * @param <E> Edge Value - * @param <T> type being created - * @return new object of type <T> - */ - public static - <I extends WritableComparable, V extends Writable, E extends Writable, T> - T newInstance(Class<T> klass, - ImmutableClassesGiraphConfiguration<I, V, E> conf, - HiveTableSchema schema) { - T object = ReflectionUtils.<T>newInstance(klass, conf); - HiveTableSchemas.configure(object, schema); - return object; - } -}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/common/LanguageAndType.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/LanguageAndType.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/LanguageAndType.java deleted file mode 100644 index 7a2789a..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/LanguageAndType.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.common; - -import org.apache.giraph.graph.Language; - -/** - * A class/type with the language. - */ -public class LanguageAndType { - /** Language programmed in */ - private final Language language; - /** Java class name */ - private final Class javaClass; - /** Jython class name */ - private final String jythonClassName; - - /** - * Constructor - * - * @param language Language - * @param javaClass java class - * @param jythonClassName jython class name - */ - private LanguageAndType(Language language, Class javaClass, - String jythonClassName) { - this.javaClass = javaClass; - this.language = language; - this.jythonClassName = jythonClassName; - } - - /** - * create for jython class - * - * @param name jython class name - * @return new instance - */ - public static LanguageAndType jython(String name) { - return new LanguageAndType(Language.JYTHON, null, name); - } - - /** - * create for java class - * - * @param klass class - * @return new instance - */ - public static LanguageAndType java(Class klass) { - return new LanguageAndType(Language.JAVA, klass, null); - } - - public Class getJavaClass() { - return javaClass; - } - - public String getJythonClassName() { - return jythonClassName; - } - - public Language getLanguage() { - return language; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/common/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/package-info.java deleted file mode 100644 index 1535b18..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Common Hive related utilities. - */ -package org.apache.giraph.hive.common; http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/HiveInputChecker.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/HiveInputChecker.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/HiveInputChecker.java deleted file mode 100644 index 7572f0c..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/HiveInputChecker.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.input; - -import com.facebook.hiveio.input.HiveInputDescription; -import com.facebook.hiveio.schema.HiveTableSchema; - -/** - * Interface to check the validity of a Hive input configuration. - */ -public interface HiveInputChecker { - /** - * Check the input is valid. This method provides information to the user as - * early as possible so that they may validate they are using the correct - * input and fail the job early rather than getting into it and waiting a long - * time only to find out something was misconfigured. - * - * @param inputDesc HiveInputDescription - * @param schema Hive table schema - */ - void checkInput(HiveInputDescription inputDesc, HiveTableSchema schema); -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/RecordReaderWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/RecordReaderWrapper.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/RecordReaderWrapper.java deleted file mode 100644 index 5c941c6..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/RecordReaderWrapper.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.input; - -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.RecordReader; - -import com.google.common.collect.AbstractIterator; - -import java.io.IOException; - -/** - * Wraps {@link RecordReader} into {@link java.util.Iterator} - * - * @param <T> Data of record reader - */ -public class RecordReaderWrapper<T> extends AbstractIterator<T> { - /** Wrapped {@link RecordReader} */ - private final RecordReader<WritableComparable, T> recordReader; - - /** - * Constructor - * - * @param recordReader {@link RecordReader} to wrap - */ - public RecordReaderWrapper(RecordReader<WritableComparable, T> recordReader) { - this.recordReader = recordReader; - } - - @Override - protected T computeNext() { - try { - if (!recordReader.nextKeyValue()) { - endOfData(); - return null; - } - return recordReader.getCurrentValue(); - } catch (IOException | InterruptedException e) { - throw new IllegalStateException( - "computeNext: Unexpected exception occurred", e); - } - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/AbstractHiveToEdge.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/AbstractHiveToEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/AbstractHiveToEdge.java deleted file mode 100644 index 9fe5a39..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/AbstractHiveToEdge.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.input.edge; - -import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -/** - * Base class for HiveToEdge implementations - * - * @param <I> Vertex ID - * @param <E> Edge Value - */ -public abstract class AbstractHiveToEdge<I extends WritableComparable, - E extends Writable> - extends DefaultConfigurableAndTableSchemaAware<I, Writable, E> - implements HiveToEdge<I, E> { - @Override - public final void remove() { - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java deleted file mode 100644 index 6ba2aec..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.input.edge; - -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.hive.common.GiraphHiveConstants; -import org.apache.giraph.hive.common.HiveUtils; -import org.apache.giraph.io.EdgeInputFormat; -import org.apache.giraph.io.EdgeReader; -import org.apache.giraph.io.iterables.EdgeReaderWrapper; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import com.facebook.hiveio.input.HiveApiInputFormat; -import com.facebook.hiveio.input.HiveInputDescription; -import com.facebook.hiveio.record.HiveReadableRecord; -import com.facebook.hiveio.schema.HiveTableSchema; - -import java.io.IOException; -import java.util.List; - -/** - * {@link EdgeInputFormat} for reading edges from Hive. - * - * @param <I> Vertex id - * @param <E> Edge value - */ -public class HiveEdgeInputFormat<I extends WritableComparable, - E extends Writable> extends EdgeInputFormat<I, E> { - /** Underlying Hive InputFormat used */ - private final HiveApiInputFormat hiveInputFormat; - - /** - * Create edge input format. - */ - public HiveEdgeInputFormat() { - hiveInputFormat = new HiveApiInputFormat(); - } - - @Override public void checkInputSpecs(Configuration conf) { - HiveInputDescription inputDesc = - GiraphHiveConstants.HIVE_VERTEX_INPUT.makeInputDescription(conf); - HiveTableSchema schema = getTableSchema(); - HiveToEdge<I, E> hiveToEdge = HiveUtils.newHiveToEdge(getConf(), schema); - hiveToEdge.checkInput(inputDesc, schema); - } - - @Override - public void setConf( - ImmutableClassesGiraphConfiguration<I, Writable, E> conf) { - super.setConf(conf); - hiveInputFormat.initialize( - GiraphHiveConstants.HIVE_EDGE_INPUT.makeInputDescription(conf), - GiraphHiveConstants.HIVE_EDGE_INPUT.getProfileID(conf), - conf); - } - - @Override - public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) - throws IOException, InterruptedException { - return hiveInputFormat.getSplits(context); - } - - @Override - public EdgeReader<I, E> createEdgeReader(InputSplit split, - TaskAttemptContext context) - throws IOException { - - HiveEdgeReader<I, E> reader = new HiveEdgeReader<I, E>(); - reader.setTableSchema(getTableSchema()); - - RecordReader<WritableComparable, HiveReadableRecord> baseReader; - try { - baseReader = hiveInputFormat.createRecordReader(split, context); - } catch (InterruptedException e) { - throw new IllegalStateException("Could not create edge record reader", e); - } - - reader.setHiveRecordReader(baseReader); - return new EdgeReaderWrapper<I, E>(reader); - } - - /** - * Get Hive table schema - * - * @return Hive table schema - */ - private HiveTableSchema getTableSchema() { - return hiveInputFormat.getTableSchema(getConf()); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java deleted file mode 100644 index cc1dcd5..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.input.edge; - -import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware; -import org.apache.giraph.hive.common.HiveUtils; -import org.apache.giraph.hive.input.RecordReaderWrapper; -import org.apache.giraph.io.iterables.EdgeWithSource; -import org.apache.giraph.io.iterables.GiraphReader; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import com.facebook.hiveio.record.HiveReadableRecord; - -import java.io.IOException; - -/** - * A reader for reading edges from Hive. - * - * @param <I> Vertex ID - * @param <E> Edge Value - */ -public class HiveEdgeReader<I extends WritableComparable, E extends Writable> - extends DefaultConfigurableAndTableSchemaAware<I, Writable, E> - implements GiraphReader<EdgeWithSource<I, E>> { - /** Underlying Hive RecordReader used */ - private RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader; - - /** User class to create edges from a HiveRecord */ - private HiveToEdge<I, E> hiveToEdge; - - /** - * Get underlying Hive record reader used. - * - * @return RecordReader from Hive - */ - public RecordReader<WritableComparable, HiveReadableRecord> - getHiveRecordReader() { - return hiveRecordReader; - } - - /** - * Set underlying Hive record reader used. - * - * @param hiveRecordReader RecordReader to read from Hive. - */ - public void setHiveRecordReader( - RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader) { - this.hiveRecordReader = hiveRecordReader; - } - - @Override - public void initialize(InputSplit inputSplit, TaskAttemptContext context) - throws IOException, InterruptedException { - hiveRecordReader.initialize(inputSplit, context); - hiveToEdge = HiveUtils.newHiveToEdge(getConf(), getTableSchema()); - hiveToEdge.initializeRecords( - new RecordReaderWrapper<HiveReadableRecord>(hiveRecordReader)); - } - - @Override - public void close() throws IOException { - hiveRecordReader.close(); - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return hiveRecordReader.getProgress(); - } - - @Override - public boolean hasNext() { - return hiveToEdge.hasNext(); - } - - @Override - public EdgeWithSource<I, E> next() { - return hiveToEdge.next(); - } - - @Override - public void remove() { - hiveToEdge.remove(); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java deleted file mode 100644 index 1782114..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.input.edge; - -import org.apache.giraph.hive.input.HiveInputChecker; -import org.apache.giraph.io.iterables.EdgeWithSource; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import com.facebook.hiveio.record.HiveReadableRecord; - -import java.util.Iterator; - -/** - * An interface used to create edges from Hive records. - * - * It gets initialized with HiveRecord iterator, and it needs to provide an - * iterator over edges, so it's possible to skip some rows from the input, - * combine several rows together, etc. - * - * @param <I> Vertex ID - * @param <E> Edge Value - */ -public interface HiveToEdge<I extends WritableComparable, - E extends Writable> extends Iterator<EdgeWithSource<I, E>>, - HiveInputChecker { - /** - * Set the records which contain edge input data - * - * @param records Hive records - */ - void initializeRecords(Iterator<HiveReadableRecord> records); -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java deleted file mode 100644 index 55d9299..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.input.edge; - -import org.apache.giraph.io.iterables.EdgeWithSource; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import com.facebook.hiveio.record.HiveReadableRecord; - -import java.util.Iterator; - -/** - * Simple implementation of {@link HiveToEdge} when each edge is in the one - * row of the input. - * - * @param <I> Vertex id - * @param <E> Edge data - */ -public abstract class SimpleHiveToEdge<I extends WritableComparable, - E extends Writable> extends AbstractHiveToEdge<I, E> { - /** Iterator over input records */ - private Iterator<HiveReadableRecord> records; - /** Reusable {@link EdgeWithSource} object */ - private EdgeWithSource<I, E> reusableEdge = new EdgeWithSource<I, E>(); - - /** - * Read source vertex ID from Hive record - * - * @param hiveRecord HiveRecord to read from - * @return source vertex ID - */ - public abstract I getSourceVertexId(HiveReadableRecord hiveRecord); - - /** - * Read target vertex ID from Hive record - * - * @param hiveRecord HiveRecord to read from - * @return target vertex ID - */ - public abstract I getTargetVertexId(HiveReadableRecord hiveRecord); - - /** - * Read edge value from the Hive record. - * - * @param hiveRecord HiveRecord to read from - * @return Edge value - */ - public abstract E getEdgeValue(HiveReadableRecord hiveRecord); - - @Override - public void initializeRecords(Iterator<HiveReadableRecord> records) { - this.records = records; - reusableEdge.setSourceVertexId(getConf().createVertexId()); - reusableEdge.setEdge(getConf().createReusableEdge()); - } - - @Override - public boolean hasNext() { - return records.hasNext(); - } - - @Override - public EdgeWithSource<I, E> next() { - HiveReadableRecord record = records.next(); - reusableEdge.setSourceVertexId(getSourceVertexId(record)); - reusableEdge.setTargetVertexId(getTargetVertexId(record)); - reusableEdge.setEdgeValue(getEdgeValue(record)); - return reusableEdge; - } - - protected I getReusableSourceVertexId() { - return reusableEdge.getSourceVertexId(); - } - - protected I getReusableTargetVertexId() { - return reusableEdge.getTargetVertexId(); - } - - protected E getReusableEdgeValue() { - return reusableEdge.getEdgeValue(); - } -} - http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java deleted file mode 100644 index 55ccca3..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntDoubleEdge.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.input.edge.examples; - -import org.apache.giraph.hive.common.HiveParsing; -import org.apache.giraph.hive.input.edge.SimpleHiveToEdge; -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.IntWritable; - -import com.facebook.hiveio.common.HiveType; -import com.facebook.hiveio.input.HiveInputDescription; -import com.facebook.hiveio.input.parser.Records; -import com.facebook.hiveio.record.HiveReadableRecord; -import com.facebook.hiveio.schema.HiveTableSchema; - -/** - * A simple HiveToEdge with integer IDs and double edge values. - */ -public class HiveIntDoubleEdge - extends SimpleHiveToEdge<IntWritable, DoubleWritable> { - @Override public void checkInput(HiveInputDescription inputDesc, - HiveTableSchema schema) { - Records.verifyType(0, HiveType.INT, schema); - Records.verifyType(1, HiveType.INT, schema); - Records.verifyType(2, HiveType.DOUBLE, schema); - } - - @Override - public DoubleWritable getEdgeValue(HiveReadableRecord hiveRecord) { - return HiveParsing.parseDoubleWritable(hiveRecord, 2, - getReusableEdgeValue()); - } - - @Override - public IntWritable getSourceVertexId(HiveReadableRecord hiveRecord) { - return HiveParsing.parseIntID(hiveRecord, 0, getReusableSourceVertexId()); - } - - @Override - public IntWritable getTargetVertexId(HiveReadableRecord hiveRecord) { - return HiveParsing.parseIntID(hiveRecord, 1, getReusableTargetVertexId()); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java deleted file mode 100644 index cc1013c..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/HiveIntNullEdge.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.hive.input.edge.examples; - -import org.apache.giraph.hive.common.HiveParsing; -import org.apache.giraph.hive.input.edge.SimpleHiveToEdge; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.log4j.Logger; - -import com.facebook.hiveio.common.HiveType; -import com.facebook.hiveio.input.HiveInputDescription; -import com.facebook.hiveio.input.parser.Records; -import com.facebook.hiveio.record.HiveReadableRecord; -import com.facebook.hiveio.schema.HiveTableSchema; - -/** - * A simple HiveToEdge with integer IDs, no edge value, that assumes the Hive - * table is made up of [source,target] columns. - */ -public class HiveIntNullEdge - extends SimpleHiveToEdge<IntWritable, NullWritable> { - /** Logger */ - private static final Logger LOG = Logger.getLogger(HiveIntNullEdge.class); - - @Override public void checkInput(HiveInputDescription inputDesc, - HiveTableSchema schema) { - Records.verifyType(0, HiveType.INT, schema); - Records.verifyType(1, HiveType.INT, schema); - } - - @Override public NullWritable getEdgeValue(HiveReadableRecord hiveRecord) { - return NullWritable.get(); - } - - @Override - public IntWritable getSourceVertexId(HiveReadableRecord hiveRecord) { - return HiveParsing.parseIntID(hiveRecord, 0, getReusableSourceVertexId()); - } - - @Override - public IntWritable getTargetVertexId(HiveReadableRecord hiveRecord) { - return HiveParsing.parseIntID(hiveRecord, 1, getReusableTargetVertexId()); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/package-info.java deleted file mode 100644 index 56ff60b..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/examples/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Hive input edge examples. - */ -package org.apache.giraph.hive.input.edge.examples; http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/package-info.java deleted file mode 100644 index 092ea39..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Hive Edge input related things. - */ -package org.apache.giraph.hive.input.edge; http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/AbstractHiveToMapping.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/AbstractHiveToMapping.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/AbstractHiveToMapping.java deleted file mode 100644 index dc7a6ee..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/AbstractHiveToMapping.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.input.mapping; - -import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -/** - * AbstractHiveToMapping - * - * @param <I> vertexId type parameter - * @param <B> mapping target type parameter - */ -public abstract class AbstractHiveToMapping<I extends WritableComparable, - B extends Writable> - extends DefaultConfigurableAndTableSchemaAware<I, Writable, Writable> - implements HiveToMapping<I, B> { - @Override - public final void remove() { - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingInputFormat.java deleted file mode 100644 index 973813d..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingInputFormat.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.input.mapping; - -import com.facebook.hiveio.input.HiveApiInputFormat; -import com.facebook.hiveio.input.HiveInputDescription; -import com.facebook.hiveio.record.HiveReadableRecord; -import com.facebook.hiveio.schema.HiveTableSchema; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.hive.common.GiraphHiveConstants; -import org.apache.giraph.io.MappingInputFormat; -import org.apache.giraph.io.MappingReader; -import org.apache.giraph.io.iterables.MappingReaderWrapper; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import java.io.IOException; -import java.util.List; - -import static org.apache.giraph.hive.common.HiveUtils.newHiveToMapping; - -/** - * HiveMappingInputFormat extends MappingInputFormat - * - * @param <I> vertexId type - * @param <V> vertexValue type - * @param <E> edgeValue type - * @param <B> mappingTarget type - */ -public class HiveMappingInputFormat<I extends WritableComparable, - V extends Writable, E extends Writable, B extends Writable> - extends MappingInputFormat<I, V, E, B> { - /** Underlying Hive InputFormat used */ - private final HiveApiInputFormat hiveInputFormat; - - /** - * Create vertex input format - */ - public HiveMappingInputFormat() { - hiveInputFormat = new HiveApiInputFormat(); - } - - @Override - public void checkInputSpecs(Configuration conf) { - HiveInputDescription inputDesc = - GiraphHiveConstants.HIVE_MAPPING_INPUT.makeInputDescription(conf); - HiveTableSchema schema = getTableSchema(); - HiveToMapping<I, B> hiveToMapping = newHiveToMapping(getConf(), schema); - hiveToMapping.checkInput(inputDesc, schema); - } - - - @Override - public void setConf(ImmutableClassesGiraphConfiguration<I, V, E> conf) { - super.setConf(conf); - hiveInputFormat.initialize( - GiraphHiveConstants.HIVE_MAPPING_INPUT.makeInputDescription(conf), - GiraphHiveConstants.HIVE_MAPPING_INPUT.getProfileID(conf), - conf); - } - - - @Override - public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) - throws IOException, InterruptedException { - return hiveInputFormat.getSplits(context); - } - - @Override - public MappingReader<I, V, E, B> createMappingReader(InputSplit split, - TaskAttemptContext context) throws IOException { - HiveMappingReader<I, B> reader = new HiveMappingReader<>(); - reader.setTableSchema(getTableSchema()); - - RecordReader<WritableComparable, HiveReadableRecord> baseReader; - try { - baseReader = hiveInputFormat.createRecordReader(split, context); - } catch (InterruptedException e) { - throw new IOException("Could not create map reader", e); - } - - reader.setHiveRecordReader(baseReader); - return new MappingReaderWrapper<>(reader); - } - - - /** - * Get Hive table schema - * - * @return Hive table schema - */ - private HiveTableSchema getTableSchema() { - return hiveInputFormat.getTableSchema(getConf()); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingReader.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingReader.java deleted file mode 100644 index 3154f9d..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingReader.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.input.mapping; - -import com.facebook.hiveio.record.HiveReadableRecord; -import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware; -import org.apache.giraph.hive.common.HiveUtils; -import org.apache.giraph.hive.input.RecordReaderWrapper; -import org.apache.giraph.io.iterables.GiraphReader; -import org.apache.giraph.mapping.MappingEntry; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import java.io.IOException; - -/** - * MappingReader using Hive - * - * @param <I> vertexId type - * @param <B> mappingTarget type - */ -public class HiveMappingReader<I extends WritableComparable, - B extends Writable> - extends DefaultConfigurableAndTableSchemaAware<I, Writable, Writable> - implements GiraphReader<MappingEntry<I, B>> { - /** Underlying Hive RecordReader used */ - private RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader; - /** Hive To Mapping */ - private HiveToMapping<I, B> hiveToMapping; - - /** - * Get hiverecord reader - * - * @return hiveRecordReader - */ - public RecordReader<WritableComparable, HiveReadableRecord> - getHiveRecordReader() { - return hiveRecordReader; - } - - public void setHiveRecordReader( - RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader) { - this.hiveRecordReader = hiveRecordReader; - } - - @Override - public void initialize(InputSplit inputSplit, - TaskAttemptContext context) throws IOException, InterruptedException { - hiveRecordReader.initialize(inputSplit, context); - hiveToMapping = HiveUtils.newHiveToMapping(getConf(), getTableSchema()); - hiveToMapping.initializeRecords( - new RecordReaderWrapper<>(hiveRecordReader)); - } - - @Override - public void close() throws IOException { - hiveRecordReader.close(); - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return hiveRecordReader.getProgress(); - } - - @Override - public boolean hasNext() { - return hiveToMapping.hasNext(); - } - - - @Override - public MappingEntry<I, B> next() { - return hiveToMapping.next(); - } - - @Override - public void remove() { - hiveToMapping.remove(); - } - -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveToMapping.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveToMapping.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveToMapping.java deleted file mode 100644 index 497b044..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveToMapping.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.input.mapping; - -import com.facebook.hiveio.record.HiveReadableRecord; -import org.apache.giraph.hive.input.HiveInputChecker; -import org.apache.giraph.mapping.MappingEntry; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import java.util.Iterator; - -/** - * HiveToMapping interface - * - * @param <I> vertexId type - * @param <B> mappingTarget type - */ -public interface HiveToMapping<I extends WritableComparable, - B extends Writable> extends - Iterator<MappingEntry<I, B>>, HiveInputChecker { - /** - * Set the records which contain vertex input data - * - * @param records Hive records - */ - void initializeRecords(Iterator<HiveReadableRecord> records); -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/SimpleHiveToMapping.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/SimpleHiveToMapping.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/SimpleHiveToMapping.java deleted file mode 100644 index feccc1f..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/SimpleHiveToMapping.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.input.mapping; - -import com.facebook.hiveio.record.HiveReadableRecord; -import org.apache.giraph.mapping.MappingEntry; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import java.util.Iterator; - -/** - * SimpleHiveToMapping - convenient class for HiveToMapping - * - * @param <I> vertexId type - * @param <B> mappingTarget type - */ -@SuppressWarnings("unchecked") -public abstract class SimpleHiveToMapping<I extends WritableComparable, - B extends Writable> extends AbstractHiveToMapping<I, B> { - /** Hive records which we are reading from */ - private Iterator<HiveReadableRecord> records; - - /** Reusable entry object */ - private MappingEntry<I, B> reusableEntry; - - /** Reusable vertex id */ - private I reusableVertexId; - /** Reusable mapping target */ - private B reusableMappingTarget; - - /** - * Read vertexId from hive record - * - * @param record HiveReadableRecord - * @return vertexId - */ - public abstract I getVertexId(HiveReadableRecord record); - - /** - * Read mappingTarget from hive record - * - * @param record HiveReadableRecord - * @return mappingTarget - */ - public abstract B getMappingTarget(HiveReadableRecord record); - - @Override - public void initializeRecords(Iterator<HiveReadableRecord> records) { - this.records = records; - reusableVertexId = getConf().createVertexId(); - reusableMappingTarget = (B) getConf().createMappingTarget(); - reusableEntry = new MappingEntry<>(reusableVertexId, - reusableMappingTarget); - } - - @Override - public boolean hasNext() { - return records.hasNext(); - } - - @Override - public MappingEntry<I, B> next() { - HiveReadableRecord record = records.next(); - I id = getVertexId(record); - B target = getMappingTarget(record); - reusableEntry.setVertexId(id); - reusableEntry.setMappingTarget(target); - return reusableEntry; - } - - /** - * Returns reusableVertexId for use in other methods - * - * @return reusableVertexId - */ - public I getReusableVertexId() { - return reusableVertexId; - } - - /** - * Returns reusableMappingTarget for use in other methods - * - * @return reusableMappingTarget - */ - public B getReusableMappingTarget() { - return reusableMappingTarget; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongByteHiveToMapping.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongByteHiveToMapping.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongByteHiveToMapping.java deleted file mode 100644 index fc9f9d3..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongByteHiveToMapping.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.input.mapping.examples; - -import com.facebook.hiveio.common.HiveType; -import com.facebook.hiveio.input.HiveInputDescription; -import com.facebook.hiveio.input.parser.Records; -import com.facebook.hiveio.record.HiveReadableRecord; -import com.facebook.hiveio.schema.HiveTableSchema; -import org.apache.giraph.hive.input.mapping.SimpleHiveToMapping; -import org.apache.hadoop.io.ByteWritable; -import org.apache.hadoop.io.LongWritable; - -/** - * Long VertexId, Byte MappingTarget implementation of HiveToMapping - */ -public class LongByteHiveToMapping extends SimpleHiveToMapping<LongWritable, - ByteWritable> { - - @Override - public void checkInput(HiveInputDescription inputDesc, - HiveTableSchema schema) { - Records.verifyType(0, HiveType.LONG, schema); - Records.verifyType(1, HiveType.BYTE, schema); - } - - @Override - public LongWritable getVertexId(HiveReadableRecord record) { - LongWritable reusableId = getReusableVertexId(); - reusableId.set(record.getLong(0)); - return reusableId; - } - - @Override - public ByteWritable getMappingTarget(HiveReadableRecord record) { - ByteWritable reusableTarget = getReusableMappingTarget(); - reusableTarget.set(record.getByte(1)); - return reusableTarget; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongInt2ByteHiveToMapping.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongInt2ByteHiveToMapping.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongInt2ByteHiveToMapping.java deleted file mode 100644 index 617bc4f..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/LongInt2ByteHiveToMapping.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.input.mapping.examples; - -import com.facebook.hiveio.common.HiveType; -import com.facebook.hiveio.input.HiveInputDescription; -import com.facebook.hiveio.input.parser.Records; -import com.facebook.hiveio.record.HiveReadableRecord; -import com.facebook.hiveio.schema.HiveTableSchema; -import org.apache.giraph.hive.input.mapping.SimpleHiveToMapping; -import org.apache.hadoop.io.ByteWritable; -import org.apache.hadoop.io.LongWritable; - -import java.util.Iterator; - -/** - * Long VertexId, Int Mapping target -> Byte MappingTarget - * implementation of HiveToMapping - * - * The input table has long id, int bucket value - * we need to translate this to long id & byte bucket value - */ -public class LongInt2ByteHiveToMapping extends SimpleHiveToMapping<LongWritable, - ByteWritable> { - - /** Number of workers for the job */ - private int numWorkers = 0; - - @Override - public void initializeRecords(Iterator<HiveReadableRecord> records) { - super.initializeRecords(records); - numWorkers = getConf().getMaxWorkers(); - if (numWorkers <= 0 || numWorkers >= 255) { - throw new IllegalStateException("#workers should be > 0 & < 255"); - } - } - - @Override - public void checkInput(HiveInputDescription inputDesc, - HiveTableSchema schema) { - Records.verifyType(0, HiveType.LONG, schema); - Records.verifyType(1, HiveType.INT, schema); - } - - @Override - public LongWritable getVertexId(HiveReadableRecord record) { - long id = record.getLong(0); - LongWritable reusableId = getReusableVertexId(); - reusableId.set(id); - return reusableId; - } - - @Override - public ByteWritable getMappingTarget(HiveReadableRecord record) { - int target = record.getInt(1); - ByteWritable reusableTarget = getReusableMappingTarget(); - int bVal = target % numWorkers; - if ((bVal >>> 8) != 0) { - throw new IllegalStateException("target % numWorkers overflows " + - "byte range"); - } - reusableTarget.set((byte) bVal); - return reusableTarget; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/package-info.java deleted file mode 100644 index 41afed6..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/examples/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Sample implementations of HiveToMapping interface - */ -package org.apache.giraph.hive.input.mapping.examples; http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/package-info.java deleted file mode 100644 index c7ad2a3..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Hive Mapping input related classes - */ -package org.apache.giraph.hive.input.mapping; http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/package-info.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/package-info.java deleted file mode 100644 index 7f4e5d6..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Hive input things. - */ -package org.apache.giraph.hive.input; http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertex.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertex.java deleted file mode 100644 index 87f3cfe..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/AbstractHiveToVertex.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.input.vertex; - -import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -/** - * Base class for HiveToVertex implementations - * - * @param <I> Vertex ID - * @param <V> Vertex Value - * @param <E> Edge Value - */ -public abstract class AbstractHiveToVertex<I extends WritableComparable, - V extends Writable, E extends Writable> - extends DefaultConfigurableAndTableSchemaAware<I, V, E> - implements HiveToVertex<I, V, E> { - @Override - public final void remove() { - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java deleted file mode 100644 index ad2c244..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.input.vertex; - -import org.apache.giraph.graph.Vertex; -import org.apache.giraph.hive.input.HiveInputChecker; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import com.facebook.hiveio.record.HiveReadableRecord; - -import java.util.Iterator; - -/** - * An interface used to create vertices from Hive records. - * - * It gets initialized with HiveRecord iterator, and it needs to provide an - * iterator over vertices, so it's possible to skip some rows from the input, - * combine several rows together, etc. - * - * @param <I> Vertex ID - * @param <V> Vertex Value - * @param <E> Edge Value - */ -public interface HiveToVertex<I extends WritableComparable, - V extends Writable, E extends Writable> extends - Iterator<Vertex<I, V, E>>, HiveInputChecker { - /** - * Set the records which contain vertex input data - * - * @param records Hive records - */ - void initializeRecords(Iterator<HiveReadableRecord> records); -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java deleted file mode 100644 index 499d839..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.input.vertex; - -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.hive.common.GiraphHiveConstants; -import org.apache.giraph.io.VertexInputFormat; -import org.apache.giraph.io.VertexReader; -import org.apache.giraph.io.iterables.VertexReaderWrapper; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import com.facebook.hiveio.input.HiveApiInputFormat; -import com.facebook.hiveio.input.HiveInputDescription; -import com.facebook.hiveio.record.HiveReadableRecord; -import com.facebook.hiveio.schema.HiveTableSchema; - -import java.io.IOException; -import java.util.List; - -import static org.apache.giraph.hive.common.HiveUtils.newHiveToVertex; - -/** - * {@link VertexInputFormat} for reading vertices from Hive. - * - * @param <I> Vertex id - * @param <V> Vertex value - * @param <E> Edge value - */ -public class HiveVertexInputFormat<I extends WritableComparable, - V extends Writable, E extends Writable> - extends VertexInputFormat<I, V, E> { - /** Underlying Hive InputFormat used */ - private final HiveApiInputFormat hiveInputFormat; - - /** - * Create vertex input format - */ - public HiveVertexInputFormat() { - hiveInputFormat = new HiveApiInputFormat(); - } - - @Override - public void checkInputSpecs(Configuration conf) { - HiveInputDescription inputDesc = - GiraphHiveConstants.HIVE_VERTEX_INPUT.makeInputDescription(conf); - HiveTableSchema schema = getTableSchema(); - HiveToVertex<I, V, E> hiveToVertex = newHiveToVertex(getConf(), schema); - hiveToVertex.checkInput(inputDesc, schema); - } - - @Override - public void setConf(ImmutableClassesGiraphConfiguration<I, V, E> conf) { - super.setConf(conf); - hiveInputFormat.initialize( - GiraphHiveConstants.HIVE_VERTEX_INPUT.makeInputDescription(conf), - GiraphHiveConstants.HIVE_VERTEX_INPUT.getProfileID(conf), - conf); - } - - @Override - public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) - throws IOException, InterruptedException { - return hiveInputFormat.getSplits(context); - } - - @Override - public VertexReader<I, V, E> createVertexReader(InputSplit split, - TaskAttemptContext context) throws IOException { - HiveVertexReader<I, V, E> reader = new HiveVertexReader<I, V, E>(); - reader.setTableSchema(getTableSchema()); - - RecordReader<WritableComparable, HiveReadableRecord> baseReader; - try { - baseReader = hiveInputFormat.createRecordReader(split, context); - } catch (InterruptedException e) { - throw new IOException("Could not create vertex reader", e); - } - - reader.setHiveRecordReader(baseReader); - return new VertexReaderWrapper<I, V, E>(reader); - } - - /** - * Get Hive table schema - * - * @return Hive table schema - */ - private HiveTableSchema getTableSchema() { - return hiveInputFormat.getTableSchema(getConf()); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/ad27a291/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java deleted file mode 100644 index 679a3e8..0000000 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.hive.input.vertex; - -import org.apache.giraph.graph.Vertex; -import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware; -import org.apache.giraph.hive.common.HiveUtils; -import org.apache.giraph.hive.input.RecordReaderWrapper; -import org.apache.giraph.io.iterables.GiraphReader; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import com.facebook.hiveio.record.HiveReadableRecord; - -import java.io.IOException; - -/** - * VertexReader using Hive - * - * @param <I> Vertex ID - * @param <V> Vertex Value - * @param <E> Edge Value - */ -public class HiveVertexReader<I extends WritableComparable, - V extends Writable, E extends Writable> - extends DefaultConfigurableAndTableSchemaAware<I, V, E> - implements GiraphReader<Vertex<I, V, E>> { - /** Underlying Hive RecordReader used */ - private RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader; - - /** - * {@link HiveToVertex} chosen by user, - * or {@link SimpleHiveToVertex} if none specified - */ - private HiveToVertex<I, V, E> hiveToVertex; - - /** - * Get underlying Hive record reader used. - * - * @return RecordReader from Hive. - */ - public RecordReader<WritableComparable, HiveReadableRecord> - getHiveRecordReader() { - return hiveRecordReader; - } - - /** - * Set underlying Hive record reader used. - * - * @param hiveRecordReader RecordReader to read from Hive. - */ - public void setHiveRecordReader( - RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader) { - this.hiveRecordReader = hiveRecordReader; - } - - @Override - public void initialize(InputSplit inputSplit, - TaskAttemptContext context) throws IOException, InterruptedException { - hiveRecordReader.initialize(inputSplit, context); - hiveToVertex = HiveUtils.newHiveToVertex(getConf(), getTableSchema()); - hiveToVertex.initializeRecords( - new RecordReaderWrapper<HiveReadableRecord>(hiveRecordReader)); - } - - @Override - public void close() throws IOException { - hiveRecordReader.close(); - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return hiveRecordReader.getProgress(); - } - - @Override - public boolean hasNext() { - return hiveToVertex.hasNext(); - } - - @Override - public Vertex<I, V, E> next() { - return hiveToVertex.next(); - } - - @Override - public void remove() { - hiveToVertex.remove(); - } -}
