Dear all A patch for a crude Gora backend implementation is attached. I copy-pasted the HBase implementation and made modifications.
I have questions to push it further: - HBaseSourceTarget implements TableSource<..., ...>, but GoraSourceTarget implements Source<Pair<K, V>>, Gora DataStore is a map and not a multimap. Should it be a TableSource anyway ? - I made simple examples in GoraSourceIT (will be removed, no proper tests yet). You can read/write to a GoraSourceTarget when using MemPipeline, but MRPipeline gives the following error when reading from a Gora MemStore (GoraSourceIT.testGoraTarget()): 1035 [main] WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2205 [Thread-2] WARN org.apache.hadoop.mapreduce.JobSubmitter - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 2207 [Thread-2] WARN org.apache.hadoop.mapreduce.JobSubmitter - No job jar file set. User classes may not be found. See Job or Job#setJar(String). 2925 [Thread-2] INFO org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob - Running job "org.apache.crunch.io.gora.GoraSourceIT: GoraDataStore(org.apache.gora.memory.store.MemStore@2b3b2... ID=1 (1/1)" 2925 [Thread-2] INFO org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob - Job status available at: http://localhost:8080/ java.util.NoSuchElementException at java.util.TreeMap.key(TreeMap.java:1221) at java.util.TreeMap.firstKey(TreeMap.java:285) at org.apache.gora.memory.store.MemStore.execute(MemStore.java:125) at org.apache.gora.query.impl.QueryBase.execute(QueryBase.java:73) at org.apache.gora.mapreduce.GoraRecordReader.executeQuery(GoraRecordReader.java:68) at org.apache.gora.mapreduce.GoraRecordReader.nextKeyValue(GoraRecordReader.java:110) at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:532) at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:235) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) - Should there be an equivalent to HBaseTypes.puts and HBaseTypes.deletes with Gora? - When Crunch was imported to Eclipse, the following problem appeared in crunch-hbase/pom.xml: Plugin execution not covered by lifecycle configuration: org.apache.maven.plugins:maven-dependency-plugin:2.8:build-classpath (execution: create-mrapp-generated-classpath, phase: generate-test- resources) What could be the reason (for the moment I let Eclipse automatically fix the problem) ? - More generally, what about code quality? (still junior...) I don't know if it's headed in the right place, so thanks in advance for your directions. Vincent
From 631d39adfc8491a78e921934de963a87e645cc2a Mon Sep 17 00:00:00 2001 From: Vincent Fabro <vincent.fabro.nu...@gmail.com> Date: Thu, 21 May 2015 03:18:04 +0200 Subject: [PATCH] CRUNCH-184 Gora backend implementation: first try --- crunch-gora/pom.xml | 202 ++++++++++ .../it/java/com/example/goratest/MetricDatum.java | 425 +++++++++++++++++++++ .../org/apache/crunch/io/gora/GoraSourceIT.java | 134 +++++++ crunch-gora/src/it/resources/log4j.properties | 29 ++ .../java/org/apache/crunch/io/gora/AtGora.java | 36 ++ .../java/org/apache/crunch/io/gora/FromGora.java | 35 ++ .../java/org/apache/crunch/io/gora/GoraData.java | 65 ++++ .../org/apache/crunch/io/gora/GoraIterable.java | 47 +++ .../org/apache/crunch/io/gora/GoraIterator.java | 69 ++++ .../apache/crunch/io/gora/GoraPairConverter.java | 69 ++++ .../apache/crunch/io/gora/GoraSourceTarget.java | 128 +++++++ .../java/org/apache/crunch/io/gora/GoraTarget.java | 110 ++++++ .../apache/crunch/io/gora/GoraValueConverter.java | 68 ++++ .../java/org/apache/crunch/io/gora/ToGora.java | 35 ++ pom.xml | 1 + 15 files changed, 1453 insertions(+) create mode 100644 crunch-gora/pom.xml create mode 100644 crunch-gora/src/it/java/com/example/goratest/MetricDatum.java create mode 100644 crunch-gora/src/it/java/org/apache/crunch/io/gora/GoraSourceIT.java create mode 100644 crunch-gora/src/it/resources/log4j.properties create mode 100644 crunch-gora/src/main/java/org/apache/crunch/io/gora/AtGora.java create mode 100644 crunch-gora/src/main/java/org/apache/crunch/io/gora/FromGora.java create mode 100644 crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraData.java create mode 100644 crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraIterable.java create mode 100644 crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraIterator.java create mode 100644 crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraPairConverter.java create mode 100644 crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraSourceTarget.java create mode 100644 crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraTarget.java create mode 100644 crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraValueConverter.java create mode 100644 crunch-gora/src/main/java/org/apache/crunch/io/gora/ToGora.java diff --git a/crunch-gora/pom.xml b/crunch-gora/pom.xml new file mode 100644 index 0000000..9f2f160 --- /dev/null +++ b/crunch-gora/pom.xml @@ -0,0 +1,202 @@ +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + you under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-parent</artifactId> + <version>0.13.0-SNAPSHOT</version> + </parent> + + <artifactId>crunch-gora</artifactId> + <name>Apache Crunch Gora Support</name> + + <dependencies> + <dependency> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-core</artifactId> + </dependency> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.gora</groupId> + <artifactId>gora-core</artifactId> + <version>0.6</version> + </dependency> + + <dependency> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-test</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + + <profiles> + <profile> + <id>hadoop-1</id> + <activation> + <property> + <name>!crunch.platform</name> + </property> + </activation> + </profile> + <profile> + <id>hadoop-2</id> + <activation> + <property> + <name>crunch.platform</name> + <value>2</value> + </property> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + </dependency> + </dependencies> + </profile> + </profiles> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>create-mrapp-generated-classpath</id> + <phase>generate-test-resources</phase> + <goals> + <goal>build-classpath</goal> + </goals> + <configuration> + <!-- needed to run the unit test for DS to generate the required classpath + that is required in the env of the launch container in the mini mr/yarn cluster --> + <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + </plugin> + </plugins> + <pluginManagement> + <plugins> + <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.--> + <plugin> + <groupId>org.eclipse.m2e</groupId> + <artifactId>lifecycle-mapping</artifactId> + <version>1.0.0</version> + <configuration> + <lifecycleMappingMetadata> + <pluginExecutions> + <pluginExecution> + <pluginExecutionFilter> + <groupId> + org.apache.maven.plugins + </groupId> + <artifactId> + maven-dependency-plugin + </artifactId> + <versionRange> + [2.8,) + </versionRange> + <goals> + <goal>build-classpath</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore></ignore> + </action> + </pluginExecution> + </pluginExecutions> + </lifecycleMappingMetadata> + </configuration> + </plugin> + </plugins> + </pluginManagement> + </build> + +</project> diff --git a/crunch-gora/src/it/java/com/example/goratest/MetricDatum.java b/crunch-gora/src/it/java/com/example/goratest/MetricDatum.java new file mode 100644 index 0000000..3ff92e4 --- /dev/null +++ b/crunch-gora/src/it/java/com/example/goratest/MetricDatum.java @@ -0,0 +1,425 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.example.goratest; +@SuppressWarnings("all") +public class MetricDatum extends org.apache.gora.persistency.impl.PersistentBase implements org.apache.avro.specific.SpecificRecord, org.apache.gora.persistency.Persistent { + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"MetricDatum\",\"namespace\":\"com.example.goratest\",\"fields\":[{\"name\":\"metricDimension\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp\",\"type\":\"long\",\"default\":0},{\"name\":\"metric\",\"type\":\"long\",\"default\":0}],\"default\":null}"); + + /** Enum containing all data bean's fields. */ + public static enum Field { + METRIC_DIMENSION(0, "metricDimension"), + TIMESTAMP(1, "timestamp"), + METRIC(2, "metric"), + ; + /** + * Field's index. + */ + private int index; + + /** + * Field's name. + */ + private String name; + + /** + * Field's constructor + * @param index field's index. + * @param name field's name. + */ + Field(int index, String name) {this.index=index;this.name=name;} + + /** + * Gets field's index. + * @return int field's index. + */ + public int getIndex() {return index;} + + /** + * Gets field's name. + * @return String field's name. + */ + public String getName() {return name;} + + /** + * Gets field's attributes to string. + * @return String field's attributes to string. + */ + public String toString() {return name;} + }; + + public static final String[] _ALL_FIELDS = { + "metricDimension", + "timestamp", + "metric", + }; + + /** + * Gets the total field count. + * @return int field count + */ + public int getFieldsCount() { + return MetricDatum._ALL_FIELDS.length; + } + + private java.lang.CharSequence metricDimension; + private long timestamp; + private long metric; + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return metricDimension; + case 1: return timestamp; + case 2: return metric; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value) { + switch (field$) { + case 0: metricDimension = (java.lang.CharSequence)(value); break; + case 1: timestamp = (java.lang.Long)(value); break; + case 2: metric = (java.lang.Long)(value); break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + /** + * Gets the value of the 'metricDimension' field. + */ + public java.lang.CharSequence getMetricDimension() { + return metricDimension; + } + + /** + * Sets the value of the 'metricDimension' field. + * @param value the value to set. + */ + public void setMetricDimension(java.lang.CharSequence value) { + this.metricDimension = value; + setDirty(0); + } + + /** + * Checks the dirty status of the 'metricDimension' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isMetricDimensionDirty() { + return isDirty(0); + } + + /** + * Gets the value of the 'timestamp' field. + */ + public java.lang.Long getTimestamp() { + return timestamp; + } + + /** + * Sets the value of the 'timestamp' field. + * @param value the value to set. + */ + public void setTimestamp(java.lang.Long value) { + this.timestamp = value; + setDirty(1); + } + + /** + * Checks the dirty status of the 'timestamp' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isTimestampDirty() { + return isDirty(1); + } + + /** + * Gets the value of the 'metric' field. + */ + public java.lang.Long getMetric() { + return metric; + } + + /** + * Sets the value of the 'metric' field. + * @param value the value to set. + */ + public void setMetric(java.lang.Long value) { + this.metric = value; + setDirty(2); + } + + /** + * Checks the dirty status of the 'metric' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isMetricDirty() { + return isDirty(2); + } + + /** Creates a new MetricDatum RecordBuilder */ + public static com.example.goratest.MetricDatum.Builder newBuilder() { + return new com.example.goratest.MetricDatum.Builder(); + } + + /** Creates a new MetricDatum RecordBuilder by copying an existing Builder */ + public static com.example.goratest.MetricDatum.Builder newBuilder(com.example.goratest.MetricDatum.Builder other) { + return new com.example.goratest.MetricDatum.Builder(other); + } + + /** Creates a new MetricDatum RecordBuilder by copying an existing MetricDatum instance */ + public static com.example.goratest.MetricDatum.Builder newBuilder(com.example.goratest.MetricDatum other) { + return new com.example.goratest.MetricDatum.Builder(other); + } + + private static java.nio.ByteBuffer deepCopyToReadOnlyBuffer( + java.nio.ByteBuffer input) { + java.nio.ByteBuffer copy = java.nio.ByteBuffer.allocate(input.capacity()); + int position = input.position(); + input.reset(); + int mark = input.position(); + int limit = input.limit(); + input.rewind(); + input.limit(input.capacity()); + copy.put(input); + input.rewind(); + copy.rewind(); + input.position(mark); + input.mark(); + copy.position(mark); + copy.mark(); + input.position(position); + copy.position(position); + input.limit(limit); + copy.limit(limit); + return copy.asReadOnlyBuffer(); + } + + /** + * RecordBuilder for MetricDatum instances. + */ + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<MetricDatum> + implements org.apache.avro.data.RecordBuilder<MetricDatum> { + + private java.lang.CharSequence metricDimension; + private long timestamp; + private long metric; + + /** Creates a new Builder */ + private Builder() { + super(com.example.goratest.MetricDatum.SCHEMA$); + } + + /** Creates a Builder by copying an existing Builder */ + private Builder(com.example.goratest.MetricDatum.Builder other) { + super(other); + } + + /** Creates a Builder by copying an existing MetricDatum instance */ + private Builder(com.example.goratest.MetricDatum other) { + super(com.example.goratest.MetricDatum.SCHEMA$); + if (isValidValue(fields()[0], other.metricDimension)) { + this.metricDimension = (java.lang.CharSequence) data().deepCopy(fields()[0].schema(), other.metricDimension); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.timestamp)) { + this.timestamp = (java.lang.Long) data().deepCopy(fields()[1].schema(), other.timestamp); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.metric)) { + this.metric = (java.lang.Long) data().deepCopy(fields()[2].schema(), other.metric); + fieldSetFlags()[2] = true; + } + } + + /** Gets the value of the 'metricDimension' field */ + public java.lang.CharSequence getMetricDimension() { + return metricDimension; + } + + /** Sets the value of the 'metricDimension' field */ + public com.example.goratest.MetricDatum.Builder setMetricDimension(java.lang.CharSequence value) { + validate(fields()[0], value); + this.metricDimension = value; + fieldSetFlags()[0] = true; + return this; + } + + /** Checks whether the 'metricDimension' field has been set */ + public boolean hasMetricDimension() { + return fieldSetFlags()[0]; + } + + /** Clears the value of the 'metricDimension' field */ + public com.example.goratest.MetricDatum.Builder clearMetricDimension() { + metricDimension = null; + fieldSetFlags()[0] = false; + return this; + } + + /** Gets the value of the 'timestamp' field */ + public java.lang.Long getTimestamp() { + return timestamp; + } + + /** Sets the value of the 'timestamp' field */ + public com.example.goratest.MetricDatum.Builder setTimestamp(long value) { + validate(fields()[1], value); + this.timestamp = value; + fieldSetFlags()[1] = true; + return this; + } + + /** Checks whether the 'timestamp' field has been set */ + public boolean hasTimestamp() { + return fieldSetFlags()[1]; + } + + /** Clears the value of the 'timestamp' field */ + public com.example.goratest.MetricDatum.Builder clearTimestamp() { + fieldSetFlags()[1] = false; + return this; + } + + /** Gets the value of the 'metric' field */ + public java.lang.Long getMetric() { + return metric; + } + + /** Sets the value of the 'metric' field */ + public com.example.goratest.MetricDatum.Builder setMetric(long value) { + validate(fields()[2], value); + this.metric = value; + fieldSetFlags()[2] = true; + return this; + } + + /** Checks whether the 'metric' field has been set */ + public boolean hasMetric() { + return fieldSetFlags()[2]; + } + + /** Clears the value of the 'metric' field */ + public com.example.goratest.MetricDatum.Builder clearMetric() { + fieldSetFlags()[2] = false; + return this; + } + + @Override + public MetricDatum build() { + try { + MetricDatum record = new MetricDatum(); + record.metricDimension = fieldSetFlags()[0] ? this.metricDimension : (java.lang.CharSequence) defaultValue(fields()[0]); + record.timestamp = fieldSetFlags()[1] ? this.timestamp : (java.lang.Long) defaultValue(fields()[1]); + record.metric = fieldSetFlags()[2] ? this.metric : (java.lang.Long) defaultValue(fields()[2]); + return record; + } catch (Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + public MetricDatum.Tombstone getTombstone(){ + return TOMBSTONE; + } + + public MetricDatum newInstance(){ + return newBuilder().build(); + } + + private static final Tombstone TOMBSTONE = new Tombstone(); + + public static final class Tombstone extends MetricDatum implements org.apache.gora.persistency.Tombstone { + + private Tombstone() { } + + /** + * Gets the value of the 'metricDimension' field. + */ + public java.lang.CharSequence getMetricDimension() { + throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones"); + } + + /** + * Sets the value of the 'metricDimension' field. + * @param value the value to set. + */ + public void setMetricDimension(java.lang.CharSequence value) { + throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones"); + } + + /** + * Checks the dirty status of the 'metricDimension' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isMetricDimensionDirty() { + throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones"); + } + + /** + * Gets the value of the 'timestamp' field. + */ + public java.lang.Long getTimestamp() { + throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones"); + } + + /** + * Sets the value of the 'timestamp' field. + * @param value the value to set. + */ + public void setTimestamp(java.lang.Long value) { + throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones"); + } + + /** + * Checks the dirty status of the 'timestamp' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isTimestampDirty() { + throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones"); + } + + /** + * Gets the value of the 'metric' field. + */ + public java.lang.Long getMetric() { + throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones"); + } + + /** + * Sets the value of the 'metric' field. + * @param value the value to set. + */ + public void setMetric(java.lang.Long value) { + throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones"); + } + + /** + * Checks the dirty status of the 'metric' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isMetricDirty() { + throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones"); + } + + + } + +} + diff --git a/crunch-gora/src/it/java/org/apache/crunch/io/gora/GoraSourceIT.java b/crunch-gora/src/it/java/org/apache/crunch/io/gora/GoraSourceIT.java new file mode 100644 index 0000000..7df2484 --- /dev/null +++ b/crunch-gora/src/it/java/org/apache/crunch/io/gora/GoraSourceIT.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.gora; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.Source; +import org.apache.crunch.SourceTarget; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.types.avro.Avros; +import org.apache.gora.memory.store.MemStore; +import org.apache.gora.store.DataStore; +import org.apache.gora.store.DataStoreFactory; +import org.apache.gora.util.GoraException; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +import com.example.goratest.MetricDatum; + +public class GoraSourceIT { + + @Test + public void testGoraSource() throws GoraException { + DataStore<Long, MetricDatum> dataStore = DataStoreFactory.createDataStore(MemStore.class, Long.class, + MetricDatum.class, new Configuration()); + dataStore.put(123L, MetricDatum.newBuilder().build()); + dataStore.put(456L, MetricDatum.newBuilder().build()); + Source<Pair<Long, MetricDatum>> source = FromGora.store(dataStore, Long.MIN_VALUE, Long.MAX_VALUE, + Avros.pairs(Avros.longs(), Avros.records(MetricDatum.class))); + Pipeline pipeline = MemPipeline.getInstance(); + PCollection<Pair<Long, MetricDatum>> res = pipeline.read(source); + for (Pair<Long, MetricDatum> pair : res.materialize()) { + System.out.println("key: " + pair.first()); + System.out.println("value: " + pair.second()); + } + pipeline.done(); + System.out.println("length: " + res.length()); + System.out.println("Done testGoraSource**************************************\n"); + } + + @Test + public void testGoraSource2() throws GoraException { + DataStore<Long, MetricDatum> dataStore = DataStoreFactory.createDataStore(MemStore.class, Long.class, + MetricDatum.class, new Configuration()); + dataStore.put(123L, MetricDatum.newBuilder().setMetric(700).build()); + dataStore.put(456L, MetricDatum.newBuilder().setMetric(500).build()); + Source<Pair<Long, MetricDatum>> source = FromGora.store(dataStore, Long.MIN_VALUE, Long.MAX_VALUE, + Avros.pairs(Avros.longs(), Avros.records(MetricDatum.class))); + Pipeline pipeline = MemPipeline.getInstance(); + PCollection<Pair<Long, MetricDatum>> res = pipeline.read(source); + PCollection<Long> metrics = res.parallelDo(new DoubleMetric(), Avros.longs()); + System.out.println("metric values..."); + for (Long metric : metrics.materialize()) { + System.out.println("metric: " + metric); + } + pipeline.done(); + System.out.println("Done testGoraSource2**************************************\n"); + } + + public static class DoubleMetric extends DoFn<Pair<Long, MetricDatum>, Long> { + private static final long serialVersionUID = 1L; + + @Override + public void process(Pair<Long, MetricDatum> input, Emitter<Long> emitter) { + emitter.emit(input.second().getMetric() * 2); + } + } + + @Test + public void testGoraTarget() throws GoraException { + DataStore<Long, MetricDatum> dataStore = DataStoreFactory.createDataStore(MemStore.class, Long.class, + MetricDatum.class, new Configuration()); + dataStore.put(123L, MetricDatum.newBuilder().setMetric(700).build()); + dataStore.put(456L, MetricDatum.newBuilder().setMetric(500).build()); + SourceTarget<Pair<Long, MetricDatum>> sourceTarget = AtGora.store(dataStore, Long.MIN_VALUE, Long.MAX_VALUE, + Avros.pairs(Avros.longs(), Avros.records(MetricDatum.class))); + Pipeline pipeline = new MRPipeline(GoraSourceIT.class, new Configuration()); + PCollection<Pair<Long, MetricDatum>> res = pipeline.read(sourceTarget); + PCollection<Pair<Long, MetricDatum>> metrics = res.parallelDo(new ModifMetric(), + Avros.pairs(Avros.longs(), Avros.records(MetricDatum.class))); + pipeline.write(metrics, sourceTarget); + pipeline.done(); + System.out.println("length: " + res.length()); + System.out.println("Done testGoraTarget**************************************\n"); + } + + @Test + public void testGoraTarget2() throws GoraException { + DataStore<Long, MetricDatum> dataStore = DataStoreFactory.createDataStore(MemStore.class, Long.class, + MetricDatum.class, new Configuration()); + dataStore.put(123L, MetricDatum.newBuilder().setMetric(700).build()); + dataStore.put(456L, MetricDatum.newBuilder().setMetric(500).build()); + SourceTarget<Pair<Long, MetricDatum>> sourceTarget = AtGora.store(dataStore, Long.MIN_VALUE, Long.MAX_VALUE, + Avros.pairs(Avros.longs(), Avros.records(MetricDatum.class))); + Pipeline pipeline = MemPipeline.getInstance(); + PCollection<Pair<Long, MetricDatum>> res = pipeline.read(sourceTarget); + PCollection<Pair<Long, MetricDatum>> metrics = res.parallelDo(new ModifMetric(), + Avros.pairs(Avros.longs(), Avros.records(MetricDatum.class))); + pipeline.write(metrics, sourceTarget); + pipeline.done(); + System.out.println("length: " + res.length()); + System.out.println("Done testGoraTarget2**************************************\n"); + } + + public static class ModifMetric extends DoFn<Pair<Long, MetricDatum>, Pair<Long, MetricDatum>> { + private static final long serialVersionUID = 1L; + + @Override + public void process(Pair<Long, MetricDatum> input, Emitter<Pair<Long, MetricDatum>> emitter) { + Long newMetric = input.second().getMetric() * 3; + input.second().setMetric(newMetric); + emitter.emit(input); + } + } +} diff --git a/crunch-gora/src/it/resources/log4j.properties b/crunch-gora/src/it/resources/log4j.properties new file mode 100644 index 0000000..5d144a0 --- /dev/null +++ b/crunch-gora/src/it/resources/log4j.properties @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ***** Set root logger level to INFO and its only appender to A. +log4j.logger.org.apache.crunch=info, A + +# Log warnings on Hadoop for the local runner when testing +log4j.logger.org.apache.hadoop=warn, A +# Except for Configuration, which is chatty. +log4j.logger.org.apache.hadoop.conf.Configuration=error, A + +# ***** A is set to be a ConsoleAppender. +log4j.appender.A=org.apache.log4j.ConsoleAppender +# ***** A uses PatternLayout. +log4j.appender.A.layout=org.apache.log4j.PatternLayout +log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/crunch-gora/src/main/java/org/apache/crunch/io/gora/AtGora.java b/crunch-gora/src/main/java/org/apache/crunch/io/gora/AtGora.java new file mode 100644 index 0000000..933ac3b --- /dev/null +++ b/crunch-gora/src/main/java/org/apache/crunch/io/gora/AtGora.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.gora; + +import org.apache.crunch.Pair; +import org.apache.crunch.SourceTarget; +import org.apache.crunch.types.PType; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.store.DataStore; + +/** + * Static factory methods for creating Gora {@link SourceTarget} types. + */ +public class AtGora { + + public static <K, V extends Persistent> GoraSourceTarget<K, V> store(DataStore<K, V> dataStore, K startKey, K endKey, + PType<Pair<K, V>> ptype) { + return new GoraSourceTarget<K, V>(dataStore, startKey, endKey, ptype); + } + +} diff --git a/crunch-gora/src/main/java/org/apache/crunch/io/gora/FromGora.java b/crunch-gora/src/main/java/org/apache/crunch/io/gora/FromGora.java new file mode 100644 index 0000000..27c3f04 --- /dev/null +++ b/crunch-gora/src/main/java/org/apache/crunch/io/gora/FromGora.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.gora; + +import org.apache.crunch.Pair; +import org.apache.crunch.Source; +import org.apache.crunch.types.PType; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.store.DataStore; + +/** + * Static factory methods for creating Gora {@link Source} types. + */ +public class FromGora { + + public static <K, V extends Persistent> Source<Pair<K, V>> store(DataStore<K, V> dataStore, K startKey, K endKey, + PType<Pair<K, V>> ptype) { + return new GoraSourceTarget<K, V>(dataStore, startKey, endKey, ptype); + } +} diff --git a/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraData.java b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraData.java new file mode 100644 index 0000000..7b70452 --- /dev/null +++ b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraData.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.gora; + +import java.io.IOException; +import java.util.Set; + +import org.apache.crunch.Pair; +import org.apache.crunch.ReadableData; +import org.apache.crunch.SourceTarget; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.store.DataStore; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + +import com.google.common.collect.ImmutableSet; + +public class GoraData<K, V extends Persistent> implements ReadableData<Pair<K, V>> { + + private DataStore<K, V> dataStore; + private K startKey; + private K endKey; + private transient SourceTarget<?> parent; + + public GoraData(DataStore<K, V> dataStore, K startKey, K endKey, SourceTarget<?> parent) { + this.dataStore = dataStore; + this.startKey = startKey; + this.endKey = endKey; + this.parent = parent; + } + + @Override + public Set<SourceTarget<?>> getSourceTargets() { + if (parent != null) { + return ImmutableSet.<SourceTarget<?>> of(parent); + } else { + return ImmutableSet.of(); + } + } + + @Override + public void configure(Configuration conf) { + // No-op + } + + @Override + public Iterable<Pair<K, V>> read(TaskInputOutputContext<?, ?, ?, ?> ctxt) throws IOException { + return new GoraIterable<K, V>(dataStore, startKey, endKey); + } +} diff --git a/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraIterable.java b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraIterable.java new file mode 100644 index 0000000..36aa28a --- /dev/null +++ b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraIterable.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.gora; + +import java.util.Iterator; + +import org.apache.crunch.Pair; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.store.DataStore; + +class GoraIterable<K, V extends Persistent> implements Iterable<Pair<K, V>> { + + private DataStore<K, V> dataStore; + private K startKey; + private K endKey; + + public GoraIterable(DataStore<K, V> dataStore, K startKey, K endKey) { + this.dataStore = dataStore; + this.startKey = startKey; + this.endKey = endKey; + } + + @Override + public Iterator<Pair<K, V>> iterator() { + Query<K, V> query = dataStore.newQuery(); + query.setKeyRange(startKey, endKey); + Result<K, V> result = query.execute(); + return new GoraIterator<K, V>(result); + } +} diff --git a/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraIterator.java b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraIterator.java new file mode 100644 index 0000000..ef6b26f --- /dev/null +++ b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraIterator.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.gora; + +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.Pair; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.query.Result; + +class GoraIterator<K, V extends Persistent> implements Iterator<Pair<K, V>> { + + private Result<K, V> result; + private boolean hasNext; + + public GoraIterator(Result<K, V> result) { + this.result = result; + try { + hasNext = result.next(); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + } + } + + @Override + public boolean hasNext() { + return hasNext; + } + + @Override + public Pair<K, V> next() { + if (!hasNext()) + throw new NoSuchElementException("no more element to iterate"); + Pair<K, V> next = Pair.of(result.getKey(), result.get()); + try { + hasNext = result.next(); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } catch (Exception e) { + throw new CrunchRuntimeException(e); + }// System.out.println("GoraIterator:next(), key: " + result.getKey()); + return next; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } +} diff --git a/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraPairConverter.java b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraPairConverter.java new file mode 100644 index 0000000..a92a17c --- /dev/null +++ b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraPairConverter.java @@ -0,0 +1,69 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ +package org.apache.crunch.io.gora; + +import org.apache.crunch.Pair; +import org.apache.crunch.types.Converter; + +class GoraPairConverter<K, V> implements Converter<K, V, Pair<K, V>, Pair<K, Iterable<V>>> { + + private Class<K> keyClass; + private Class<V> valueClass; + + public GoraPairConverter(Class<K> keyClass, Class<V> valueClass) { + this.keyClass = keyClass; + this.valueClass = valueClass; + } + + @Override + public Pair<K, V> convertInput(K key, V value) { + return Pair.of(key, value); + } + + @Override + public K outputKey(Pair<K, V> value) { + return value.first(); + } + + @Override + public V outputValue(Pair<K, V> value) { + return value.second(); + } + + @Override + public Class<K> getKeyClass() { + return keyClass; + } + + @Override + public Class<V> getValueClass() { + return valueClass; + } + + @Override + public boolean applyPTypeTransforms() { + return false; + } + + @Override + public Pair<K, Iterable<V>> convertIterableInput(K key, Iterable<V> value) { + return Pair.of(key, value); + } +} diff --git a/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraSourceTarget.java b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraSourceTarget.java new file mode 100644 index 0000000..e18b8b8 --- /dev/null +++ b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraSourceTarget.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.gora; + +import java.io.IOException; + +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.crunch.Pair; +import org.apache.crunch.ReadableData; +import org.apache.crunch.Source; +import org.apache.crunch.SourceTarget; +import org.apache.crunch.io.ReadableSourceTarget; +import org.apache.crunch.types.Converter; +import org.apache.crunch.types.PType; +import org.apache.gora.mapreduce.GoraInputFormat; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.query.Query; +import org.apache.gora.store.DataStore; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GoraSourceTarget<K, V extends Persistent> extends GoraTarget<K, V> implements + ReadableSourceTarget<Pair<K, V>> { + + private static final Logger LOG = LoggerFactory.getLogger(GoraSourceTarget.class); + + private K startKey; + private K endKey; + private PType<Pair<K, V>> ptype; + + public GoraSourceTarget(DataStore<K, V> dataStore, K startKey, K endKey, PType<Pair<K, V>> ptype) { + super(dataStore); + this.startKey = startKey; + this.endKey = endKey; + this.ptype = ptype; + } + + @Override + public Source<Pair<K, V>> inputConf(String key, String value) { + throw new UnsupportedOperationException(); + } + + @Override + public PType<Pair<K, V>> getType() { + return ptype; + } + + @Override + public boolean equals(Object other) { + if (other == null || !(other instanceof GoraSourceTarget)) { + return false; + } + GoraSourceTarget<?, ?> o = (GoraSourceTarget<?, ?>) other; + if (!super.equals(o)) + return false; + if (!ptype.equals(o.ptype)) + return false; + return true; + } + + @Override + public int hashCode() { + HashCodeBuilder hcb = new HashCodeBuilder(); + return hcb.append(dataStore).append(ptype).toHashCode(); + } + + @Override + public String toString() { + return "GoraDataStore(" + dataStore + ")"; + } + + @Override + public void configureSource(Job job, int inputId) throws IOException { + Query<K, V> query = dataStore.newQuery(); + GoraInputFormat.setInput(job, query, true); + } + + @Override + public long getSize(Configuration conf) { + // TODO something smarter here. + return 1000L * 1000L * 1000L; + } + + @Override + public long getLastModifiedAt(Configuration configuration) { + LOG.warn("Cannot determine last modified time for source: {}", toString()); + return -1; + } + + @Override + public Converter<?, ?, ?, ?> getConverter() { + return new GoraPairConverter<K, V>(dataStore.getKeyClass(), dataStore.getPersistentClass()); + } + + @Override + public Iterable<Pair<K, V>> read(Configuration conf) throws IOException { + return new GoraIterable<K, V>(dataStore, startKey, endKey); + } + + @Override + public ReadableData<Pair<K, V>> asReadable() { + return new GoraData<K, V>(dataStore, startKey, endKey, this); + } + + @Override + public SourceTarget<Pair<K, V>> conf(String key, String value) { + inputConf(key, value); + outputConf(key, value); + return this; + } +} diff --git a/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraTarget.java b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraTarget.java new file mode 100644 index 0000000..96bfcba --- /dev/null +++ b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraTarget.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.gora; + +import java.util.Map; + +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.crunch.SourceTarget; +import org.apache.crunch.Target; +import org.apache.crunch.io.MapReduceTarget; +import org.apache.crunch.io.OutputHandler; +import org.apache.crunch.types.Converter; +import org.apache.crunch.types.PType; +import org.apache.gora.mapreduce.GoraOutputFormat; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.store.DataStore; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + +public class GoraTarget<K, V extends Persistent> implements MapReduceTarget { + + private static final Logger LOG = LoggerFactory.getLogger(GoraTarget.class); + + protected DataStore<K, V> dataStore; + private Map<String, String> extraConf = Maps.newHashMap(); + + public GoraTarget(DataStore<K, V> dataStore) { + this.dataStore = dataStore; + } + + @Override + public boolean equals(Object other) { + if (this == other) + return true; + if (other == null) + return false; + if (!other.getClass().equals(getClass())) + return false; + GoraTarget<?, ?> o = (GoraTarget<?, ?>) other; + return dataStore.equals(o.dataStore); + } + + @Override + public int hashCode() { + HashCodeBuilder hcb = new HashCodeBuilder(); + return hcb.append(dataStore).toHashCode(); + } + + @Override + public String toString() { + return "GoraDataStore(" + dataStore + ")"; + } + + @Override + public boolean accept(OutputHandler handler, PType<?> ptype) { + handler.configure(this, ptype); + return true; + } + + @Override + public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) { + GoraOutputFormat.setOutput(job, dataStore, true); + final Configuration conf = job.getConfiguration(); + for (Map.Entry<String, String> e : extraConf.entrySet()) { + conf.set(e.getKey(), e.getValue()); + } + } + + @Override + public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) { + return null; + } + + @Override + public Target outputConf(String key, String value) { + extraConf.put(key, value); + return this; + } + + @Override + public boolean handleExisting(WriteMode strategy, long lastModifiedAt, Configuration conf) { + LOG.info("GoraTarget ignores checks for existing outputs..."); + return false; + } + + @Override + public Converter<?, ?, ?, ?> getConverter(final PType<?> ptype) { + return new GoraPairConverter<K, V>(dataStore.getKeyClass(), dataStore.getPersistentClass()); + } +} diff --git a/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraValueConverter.java b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraValueConverter.java new file mode 100644 index 0000000..2f9d709 --- /dev/null +++ b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraValueConverter.java @@ -0,0 +1,68 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ +package org.apache.crunch.io.gora; + +import org.apache.crunch.types.Converter; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.util.Null; + +public class GoraValueConverter<V extends Persistent> implements Converter<Object, V, V, Iterable<V>> { + + private final Class<V> serializationClass; + + public GoraValueConverter(Class<V> serializationClass) { + this.serializationClass = serializationClass; + } + + @Override + public V convertInput(Object key, V value) { + return value; + } + + @Override + public Object outputKey(V value) { + return Null.get(); + } + + @Override + public V outputValue(V value) { + return value; + } + + @Override + public Class<Object> getKeyClass() { + return (Class<Object>) (Class<?>) Null.class; + } + + @Override + public Class<V> getValueClass() { + return serializationClass; + } + + @Override + public boolean applyPTypeTransforms() { + return false; + } + + @Override + public Iterable<V> convertIterableInput(Object key, Iterable<V> value) { + return value; + } +} diff --git a/crunch-gora/src/main/java/org/apache/crunch/io/gora/ToGora.java b/crunch-gora/src/main/java/org/apache/crunch/io/gora/ToGora.java new file mode 100644 index 0000000..345d7ec --- /dev/null +++ b/crunch-gora/src/main/java/org/apache/crunch/io/gora/ToGora.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.gora; + +import org.apache.crunch.Pair; +import org.apache.crunch.Target; +import org.apache.crunch.types.PType; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.store.DataStore; + +/** + * Static factory methods for creating HBase {@link Target} types. + */ +public class ToGora { + + public static <K, V extends Persistent> Target store(DataStore<K, V> dataStore, PType<Pair<K, V>> ptype) { + return new GoraTarget<K, V>(dataStore); + } + +} diff --git a/pom.xml b/pom.xml index d47f6ff..22a1b5c 100644 --- a/pom.xml +++ b/pom.xml @@ -46,6 +46,7 @@ under the License. <modules> <module>crunch-core</module> <module>crunch-hbase</module> + <module>crunch-gora</module> <module>crunch-test</module> <module>crunch-contrib</module> <module>crunch-examples</module> -- 2.1.4