Dear all

A patch for a crude Gora backend implementation is attached. I copy-pasted
the HBase implementation and made modifications.

I have questions to push it further:

- HBaseSourceTarget implements TableSource<..., ...>, but
GoraSourceTarget implements Source<Pair<K, V>>, Gora DataStore is a map and
not a multimap. Should it be a TableSource anyway ?

- I made simple examples in GoraSourceIT (will be removed, no proper tests
yet). You can read/write to a GoraSourceTarget when using MemPipeline, but
MRPipeline gives the following error when reading from a Gora MemStore
(GoraSourceIT.testGoraTarget()):
1035 [main] WARN  org.apache.hadoop.util.NativeCodeLoader  - Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable
2205 [Thread-2] WARN  org.apache.hadoop.mapreduce.JobSubmitter  - Hadoop
command-line option parsing not performed. Implement the Tool interface and
execute your application with ToolRunner to remedy this.
2207 [Thread-2] WARN  org.apache.hadoop.mapreduce.JobSubmitter  - No job
jar file set.  User classes may not be found. See Job or Job#setJar(String).
2925 [Thread-2] INFO
org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob  -
Running job "org.apache.crunch.io.gora.GoraSourceIT:
GoraDataStore(org.apache.gora.memory.store.MemStore@2b3b2... ID=1 (1/1)"
2925 [Thread-2] INFO
org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob  -
Job status available at: http://localhost:8080/
java.util.NoSuchElementException
    at java.util.TreeMap.key(TreeMap.java:1221)
    at java.util.TreeMap.firstKey(TreeMap.java:285)
    at org.apache.gora.memory.store.MemStore.execute(MemStore.java:125)
    at org.apache.gora.query.impl.QueryBase.execute(QueryBase.java:73)
    at
org.apache.gora.mapreduce.GoraRecordReader.executeQuery(GoraRecordReader.java:68)
    at
org.apache.gora.mapreduce.GoraRecordReader.nextKeyValue(GoraRecordReader.java:110)
    at
org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:532)
    at
org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
    at
org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
    at
org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:235)
    at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

- Should there be an equivalent to HBaseTypes.puts and HBaseTypes.deletes
with Gora?

- When Crunch was imported to Eclipse, the following problem appeared in
crunch-hbase/pom.xml:
Plugin execution not covered by lifecycle configuration:
 org.apache.maven.plugins:maven-dependency-plugin:2.8:build-classpath
 (execution: create-mrapp-generated-classpath, phase: generate-test-
 resources)
What could be the reason (for the moment I let Eclipse automatically fix
the problem) ?

- More generally, what about code quality? (still junior...)

I don't know if it's headed in the right place, so thanks in advance for
your directions.

Vincent
From 631d39adfc8491a78e921934de963a87e645cc2a Mon Sep 17 00:00:00 2001
From: Vincent Fabro <vincent.fabro.nu...@gmail.com>
Date: Thu, 21 May 2015 03:18:04 +0200
Subject: [PATCH] CRUNCH-184 Gora backend implementation: first try

---
 crunch-gora/pom.xml                                | 202 ++++++++++
 .../it/java/com/example/goratest/MetricDatum.java  | 425 +++++++++++++++++++++
 .../org/apache/crunch/io/gora/GoraSourceIT.java    | 134 +++++++
 crunch-gora/src/it/resources/log4j.properties      |  29 ++
 .../java/org/apache/crunch/io/gora/AtGora.java     |  36 ++
 .../java/org/apache/crunch/io/gora/FromGora.java   |  35 ++
 .../java/org/apache/crunch/io/gora/GoraData.java   |  65 ++++
 .../org/apache/crunch/io/gora/GoraIterable.java    |  47 +++
 .../org/apache/crunch/io/gora/GoraIterator.java    |  69 ++++
 .../apache/crunch/io/gora/GoraPairConverter.java   |  69 ++++
 .../apache/crunch/io/gora/GoraSourceTarget.java    | 128 +++++++
 .../java/org/apache/crunch/io/gora/GoraTarget.java | 110 ++++++
 .../apache/crunch/io/gora/GoraValueConverter.java  |  68 ++++
 .../java/org/apache/crunch/io/gora/ToGora.java     |  35 ++
 pom.xml                                            |   1 +
 15 files changed, 1453 insertions(+)
 create mode 100644 crunch-gora/pom.xml
 create mode 100644 crunch-gora/src/it/java/com/example/goratest/MetricDatum.java
 create mode 100644 crunch-gora/src/it/java/org/apache/crunch/io/gora/GoraSourceIT.java
 create mode 100644 crunch-gora/src/it/resources/log4j.properties
 create mode 100644 crunch-gora/src/main/java/org/apache/crunch/io/gora/AtGora.java
 create mode 100644 crunch-gora/src/main/java/org/apache/crunch/io/gora/FromGora.java
 create mode 100644 crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraData.java
 create mode 100644 crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraIterable.java
 create mode 100644 crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraIterator.java
 create mode 100644 crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraPairConverter.java
 create mode 100644 crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraSourceTarget.java
 create mode 100644 crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraTarget.java
 create mode 100644 crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraValueConverter.java
 create mode 100644 crunch-gora/src/main/java/org/apache/crunch/io/gora/ToGora.java

diff --git a/crunch-gora/pom.xml b/crunch-gora/pom.xml
new file mode 100644
index 0000000..9f2f160
--- /dev/null
+++ b/crunch-gora/pom.xml
@@ -0,0 +1,202 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	you under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";>
+
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.crunch</groupId>
+		<artifactId>crunch-parent</artifactId>
+		<version>0.13.0-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>crunch-gora</artifactId>
+	<name>Apache Crunch Gora Support</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.crunch</groupId>
+			<artifactId>crunch-core</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>commons-lang</groupId>
+			<artifactId>commons-lang</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-client</artifactId>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.gora</groupId>
+			<artifactId>gora-core</artifactId>
+			<version>0.6</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.crunch</groupId>
+			<artifactId>crunch-test</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-minicluster</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>commons-io</groupId>
+			<artifactId>commons-io</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+	<profiles>
+		<profile>
+			<id>hadoop-1</id>
+			<activation>
+				<property>
+					<name>!crunch.platform</name>
+				</property>
+			</activation>
+		</profile>
+		<profile>
+			<id>hadoop-2</id>
+			<activation>
+				<property>
+					<name>crunch.platform</name>
+					<value>2</value>
+				</property>
+			</activation>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-common</artifactId>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-auth</artifactId>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-hdfs</artifactId>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-mapreduce-client-core</artifactId>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+					<type>test-jar</type>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-annotations</artifactId>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-dependency-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>create-mrapp-generated-classpath</id>
+						<phase>generate-test-resources</phase>
+						<goals>
+							<goal>build-classpath</goal>
+						</goals>
+						<configuration>
+							<!-- needed to run the unit test for DS to generate the required classpath 
+								that is required in the env of the launch container in the mini mr/yarn cluster -->
+							<outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-failsafe-plugin</artifactId>
+			</plugin>
+		</plugins>
+		<pluginManagement>
+			<plugins>
+				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>
+											org.apache.maven.plugins
+										</groupId>
+										<artifactId>
+											maven-dependency-plugin
+										</artifactId>
+										<versionRange>
+											[2.8,)
+										</versionRange>
+										<goals>
+											<goal>build-classpath</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore></ignore>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
+
+</project>
diff --git a/crunch-gora/src/it/java/com/example/goratest/MetricDatum.java b/crunch-gora/src/it/java/com/example/goratest/MetricDatum.java
new file mode 100644
index 0000000..3ff92e4
--- /dev/null
+++ b/crunch-gora/src/it/java/com/example/goratest/MetricDatum.java
@@ -0,0 +1,425 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.example.goratest;  
+@SuppressWarnings("all")
+public class MetricDatum extends org.apache.gora.persistency.impl.PersistentBase implements org.apache.avro.specific.SpecificRecord, org.apache.gora.persistency.Persistent {
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"MetricDatum\",\"namespace\":\"com.example.goratest\",\"fields\":[{\"name\":\"metricDimension\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp\",\"type\":\"long\",\"default\":0},{\"name\":\"metric\",\"type\":\"long\",\"default\":0}],\"default\":null}");
+
+  /** Enum containing all data bean's fields. */
+  public static enum Field {
+    METRIC_DIMENSION(0, "metricDimension"),
+    TIMESTAMP(1, "timestamp"),
+    METRIC(2, "metric"),
+    ;
+    /**
+     * Field's index.
+     */
+    private int index;
+
+    /**
+     * Field's name.
+     */
+    private String name;
+
+    /**
+     * Field's constructor
+     * @param index field's index.
+     * @param name field's name.
+     */
+    Field(int index, String name) {this.index=index;this.name=name;}
+
+    /**
+     * Gets field's index.
+     * @return int field's index.
+     */
+    public int getIndex() {return index;}
+
+    /**
+     * Gets field's name.
+     * @return String field's name.
+     */
+    public String getName() {return name;}
+
+    /**
+     * Gets field's attributes to string.
+     * @return String field's attributes to string.
+     */
+    public String toString() {return name;}
+  };
+
+  public static final String[] _ALL_FIELDS = {
+  "metricDimension",
+  "timestamp",
+  "metric",
+  };
+
+  /**
+   * Gets the total field count.
+   * @return int field count
+   */
+  public int getFieldsCount() {
+    return MetricDatum._ALL_FIELDS.length;
+  }
+
+  private java.lang.CharSequence metricDimension;
+  private long timestamp;
+  private long metric;
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call. 
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return metricDimension;
+    case 1: return timestamp;
+    case 2: return metric;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  
+  // Used by DatumReader.  Applications should not call. 
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value) {
+    switch (field$) {
+    case 0: metricDimension = (java.lang.CharSequence)(value); break;
+    case 1: timestamp = (java.lang.Long)(value); break;
+    case 2: metric = (java.lang.Long)(value); break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Gets the value of the 'metricDimension' field.
+   */
+  public java.lang.CharSequence getMetricDimension() {
+    return metricDimension;
+  }
+
+  /**
+   * Sets the value of the 'metricDimension' field.
+   * @param value the value to set.
+   */
+  public void setMetricDimension(java.lang.CharSequence value) {
+    this.metricDimension = value;
+    setDirty(0);
+  }
+  
+  /**
+   * Checks the dirty status of the 'metricDimension' field. A field is dirty if it represents a change that has not yet been written to the database.
+   * @param value the value to set.
+   */
+  public boolean isMetricDimensionDirty() {
+    return isDirty(0);
+  }
+
+  /**
+   * Gets the value of the 'timestamp' field.
+   */
+  public java.lang.Long getTimestamp() {
+    return timestamp;
+  }
+
+  /**
+   * Sets the value of the 'timestamp' field.
+   * @param value the value to set.
+   */
+  public void setTimestamp(java.lang.Long value) {
+    this.timestamp = value;
+    setDirty(1);
+  }
+  
+  /**
+   * Checks the dirty status of the 'timestamp' field. A field is dirty if it represents a change that has not yet been written to the database.
+   * @param value the value to set.
+   */
+  public boolean isTimestampDirty() {
+    return isDirty(1);
+  }
+
+  /**
+   * Gets the value of the 'metric' field.
+   */
+  public java.lang.Long getMetric() {
+    return metric;
+  }
+
+  /**
+   * Sets the value of the 'metric' field.
+   * @param value the value to set.
+   */
+  public void setMetric(java.lang.Long value) {
+    this.metric = value;
+    setDirty(2);
+  }
+  
+  /**
+   * Checks the dirty status of the 'metric' field. A field is dirty if it represents a change that has not yet been written to the database.
+   * @param value the value to set.
+   */
+  public boolean isMetricDirty() {
+    return isDirty(2);
+  }
+
+  /** Creates a new MetricDatum RecordBuilder */
+  public static com.example.goratest.MetricDatum.Builder newBuilder() {
+    return new com.example.goratest.MetricDatum.Builder();
+  }
+  
+  /** Creates a new MetricDatum RecordBuilder by copying an existing Builder */
+  public static com.example.goratest.MetricDatum.Builder newBuilder(com.example.goratest.MetricDatum.Builder other) {
+    return new com.example.goratest.MetricDatum.Builder(other);
+  }
+  
+  /** Creates a new MetricDatum RecordBuilder by copying an existing MetricDatum instance */
+  public static com.example.goratest.MetricDatum.Builder newBuilder(com.example.goratest.MetricDatum other) {
+    return new com.example.goratest.MetricDatum.Builder(other);
+  }
+  
+  private static java.nio.ByteBuffer deepCopyToReadOnlyBuffer(
+      java.nio.ByteBuffer input) {
+    java.nio.ByteBuffer copy = java.nio.ByteBuffer.allocate(input.capacity());
+    int position = input.position();
+    input.reset();
+    int mark = input.position();
+    int limit = input.limit();
+    input.rewind();
+    input.limit(input.capacity());
+    copy.put(input);
+    input.rewind();
+    copy.rewind();
+    input.position(mark);
+    input.mark();
+    copy.position(mark);
+    copy.mark();
+    input.position(position);
+    copy.position(position);
+    input.limit(limit);
+    copy.limit(limit);
+    return copy.asReadOnlyBuffer();
+  }
+  
+  /**
+   * RecordBuilder for MetricDatum instances.
+   */
+  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<MetricDatum>
+    implements org.apache.avro.data.RecordBuilder<MetricDatum> {
+
+    private java.lang.CharSequence metricDimension;
+    private long timestamp;
+    private long metric;
+
+    /** Creates a new Builder */
+    private Builder() {
+      super(com.example.goratest.MetricDatum.SCHEMA$);
+    }
+    
+    /** Creates a Builder by copying an existing Builder */
+    private Builder(com.example.goratest.MetricDatum.Builder other) {
+      super(other);
+    }
+    
+    /** Creates a Builder by copying an existing MetricDatum instance */
+    private Builder(com.example.goratest.MetricDatum other) {
+            super(com.example.goratest.MetricDatum.SCHEMA$);
+      if (isValidValue(fields()[0], other.metricDimension)) {
+        this.metricDimension = (java.lang.CharSequence) data().deepCopy(fields()[0].schema(), other.metricDimension);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.timestamp)) {
+        this.timestamp = (java.lang.Long) data().deepCopy(fields()[1].schema(), other.timestamp);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.metric)) {
+        this.metric = (java.lang.Long) data().deepCopy(fields()[2].schema(), other.metric);
+        fieldSetFlags()[2] = true;
+      }
+    }
+
+    /** Gets the value of the 'metricDimension' field */
+    public java.lang.CharSequence getMetricDimension() {
+      return metricDimension;
+    }
+    
+    /** Sets the value of the 'metricDimension' field */
+    public com.example.goratest.MetricDatum.Builder setMetricDimension(java.lang.CharSequence value) {
+      validate(fields()[0], value);
+      this.metricDimension = value;
+      fieldSetFlags()[0] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'metricDimension' field has been set */
+    public boolean hasMetricDimension() {
+      return fieldSetFlags()[0];
+    }
+    
+    /** Clears the value of the 'metricDimension' field */
+    public com.example.goratest.MetricDatum.Builder clearMetricDimension() {
+      metricDimension = null;
+      fieldSetFlags()[0] = false;
+      return this;
+    }
+    
+    /** Gets the value of the 'timestamp' field */
+    public java.lang.Long getTimestamp() {
+      return timestamp;
+    }
+    
+    /** Sets the value of the 'timestamp' field */
+    public com.example.goratest.MetricDatum.Builder setTimestamp(long value) {
+      validate(fields()[1], value);
+      this.timestamp = value;
+      fieldSetFlags()[1] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'timestamp' field has been set */
+    public boolean hasTimestamp() {
+      return fieldSetFlags()[1];
+    }
+    
+    /** Clears the value of the 'timestamp' field */
+    public com.example.goratest.MetricDatum.Builder clearTimestamp() {
+      fieldSetFlags()[1] = false;
+      return this;
+    }
+    
+    /** Gets the value of the 'metric' field */
+    public java.lang.Long getMetric() {
+      return metric;
+    }
+    
+    /** Sets the value of the 'metric' field */
+    public com.example.goratest.MetricDatum.Builder setMetric(long value) {
+      validate(fields()[2], value);
+      this.metric = value;
+      fieldSetFlags()[2] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'metric' field has been set */
+    public boolean hasMetric() {
+      return fieldSetFlags()[2];
+    }
+    
+    /** Clears the value of the 'metric' field */
+    public com.example.goratest.MetricDatum.Builder clearMetric() {
+      fieldSetFlags()[2] = false;
+      return this;
+    }
+    
+    @Override
+    public MetricDatum build() {
+      try {
+        MetricDatum record = new MetricDatum();
+        record.metricDimension = fieldSetFlags()[0] ? this.metricDimension : (java.lang.CharSequence) defaultValue(fields()[0]);
+        record.timestamp = fieldSetFlags()[1] ? this.timestamp : (java.lang.Long) defaultValue(fields()[1]);
+        record.metric = fieldSetFlags()[2] ? this.metric : (java.lang.Long) defaultValue(fields()[2]);
+        return record;
+      } catch (Exception e) {
+        throw new org.apache.avro.AvroRuntimeException(e);
+      }
+    }
+  }
+  
+  public MetricDatum.Tombstone getTombstone(){
+  	return TOMBSTONE;
+  }
+
+  public MetricDatum newInstance(){
+    return newBuilder().build();
+  }
+
+  private static final Tombstone TOMBSTONE = new Tombstone();
+  
+  public static final class Tombstone extends MetricDatum implements org.apache.gora.persistency.Tombstone {
+  
+      private Tombstone() { }
+  
+	  		  /**
+	   * Gets the value of the 'metricDimension' field.
+		   */
+	  public java.lang.CharSequence getMetricDimension() {
+	    throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones");
+	  }
+	
+	  /**
+	   * Sets the value of the 'metricDimension' field.
+		   * @param value the value to set.
+	   */
+	  public void setMetricDimension(java.lang.CharSequence value) {
+	    throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones");
+	  }
+	  
+	  /**
+	   * Checks the dirty status of the 'metricDimension' field. A field is dirty if it represents a change that has not yet been written to the database.
+		   * @param value the value to set.
+	   */
+	  public boolean isMetricDimensionDirty() {
+	    throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones");
+	  }
+	
+				  /**
+	   * Gets the value of the 'timestamp' field.
+		   */
+	  public java.lang.Long getTimestamp() {
+	    throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones");
+	  }
+	
+	  /**
+	   * Sets the value of the 'timestamp' field.
+		   * @param value the value to set.
+	   */
+	  public void setTimestamp(java.lang.Long value) {
+	    throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones");
+	  }
+	  
+	  /**
+	   * Checks the dirty status of the 'timestamp' field. A field is dirty if it represents a change that has not yet been written to the database.
+		   * @param value the value to set.
+	   */
+	  public boolean isTimestampDirty() {
+	    throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones");
+	  }
+	
+				  /**
+	   * Gets the value of the 'metric' field.
+		   */
+	  public java.lang.Long getMetric() {
+	    throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones");
+	  }
+	
+	  /**
+	   * Sets the value of the 'metric' field.
+		   * @param value the value to set.
+	   */
+	  public void setMetric(java.lang.Long value) {
+	    throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones");
+	  }
+	  
+	  /**
+	   * Checks the dirty status of the 'metric' field. A field is dirty if it represents a change that has not yet been written to the database.
+		   * @param value the value to set.
+	   */
+	  public boolean isMetricDirty() {
+	    throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones");
+	  }
+	
+		  
+  }
+  
+}
+
diff --git a/crunch-gora/src/it/java/org/apache/crunch/io/gora/GoraSourceIT.java b/crunch-gora/src/it/java/org/apache/crunch/io/gora/GoraSourceIT.java
new file mode 100644
index 0000000..7df2484
--- /dev/null
+++ b/crunch-gora/src/it/java/org/apache/crunch/io/gora/GoraSourceIT.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.gora;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Source;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.gora.memory.store.MemStore;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.util.GoraException;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import com.example.goratest.MetricDatum;
+
+public class GoraSourceIT {
+
+  @Test
+  public void testGoraSource() throws GoraException {
+    DataStore<Long, MetricDatum> dataStore = DataStoreFactory.createDataStore(MemStore.class, Long.class,
+        MetricDatum.class, new Configuration());
+    dataStore.put(123L, MetricDatum.newBuilder().build());
+    dataStore.put(456L, MetricDatum.newBuilder().build());
+    Source<Pair<Long, MetricDatum>> source = FromGora.store(dataStore, Long.MIN_VALUE, Long.MAX_VALUE,
+        Avros.pairs(Avros.longs(), Avros.records(MetricDatum.class)));
+    Pipeline pipeline = MemPipeline.getInstance();
+    PCollection<Pair<Long, MetricDatum>> res = pipeline.read(source);
+    for (Pair<Long, MetricDatum> pair : res.materialize()) {
+      System.out.println("key: " + pair.first());
+      System.out.println("value: " + pair.second());
+    }
+    pipeline.done();
+    System.out.println("length: " + res.length());
+    System.out.println("Done testGoraSource**************************************\n");
+  }
+
+  @Test
+  public void testGoraSource2() throws GoraException {
+    DataStore<Long, MetricDatum> dataStore = DataStoreFactory.createDataStore(MemStore.class, Long.class,
+        MetricDatum.class, new Configuration());
+    dataStore.put(123L, MetricDatum.newBuilder().setMetric(700).build());
+    dataStore.put(456L, MetricDatum.newBuilder().setMetric(500).build());
+    Source<Pair<Long, MetricDatum>> source = FromGora.store(dataStore, Long.MIN_VALUE, Long.MAX_VALUE,
+        Avros.pairs(Avros.longs(), Avros.records(MetricDatum.class)));
+    Pipeline pipeline = MemPipeline.getInstance();
+    PCollection<Pair<Long, MetricDatum>> res = pipeline.read(source);
+    PCollection<Long> metrics = res.parallelDo(new DoubleMetric(), Avros.longs());
+    System.out.println("metric values...");
+    for (Long metric : metrics.materialize()) {
+      System.out.println("metric: " + metric);
+    }
+    pipeline.done();
+    System.out.println("Done testGoraSource2**************************************\n");
+  }
+
+  public static class DoubleMetric extends DoFn<Pair<Long, MetricDatum>, Long> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void process(Pair<Long, MetricDatum> input, Emitter<Long> emitter) {
+      emitter.emit(input.second().getMetric() * 2);
+    }
+  }
+
+  @Test
+  public void testGoraTarget() throws GoraException {
+    DataStore<Long, MetricDatum> dataStore = DataStoreFactory.createDataStore(MemStore.class, Long.class,
+        MetricDatum.class, new Configuration());
+    dataStore.put(123L, MetricDatum.newBuilder().setMetric(700).build());
+    dataStore.put(456L, MetricDatum.newBuilder().setMetric(500).build());
+    SourceTarget<Pair<Long, MetricDatum>> sourceTarget = AtGora.store(dataStore, Long.MIN_VALUE, Long.MAX_VALUE,
+        Avros.pairs(Avros.longs(), Avros.records(MetricDatum.class)));
+    Pipeline pipeline = new MRPipeline(GoraSourceIT.class, new Configuration());
+    PCollection<Pair<Long, MetricDatum>> res = pipeline.read(sourceTarget);
+    PCollection<Pair<Long, MetricDatum>> metrics = res.parallelDo(new ModifMetric(),
+        Avros.pairs(Avros.longs(), Avros.records(MetricDatum.class)));
+    pipeline.write(metrics, sourceTarget);
+    pipeline.done();
+    System.out.println("length: " + res.length());
+    System.out.println("Done testGoraTarget**************************************\n");
+  }
+
+  @Test
+  public void testGoraTarget2() throws GoraException {
+    DataStore<Long, MetricDatum> dataStore = DataStoreFactory.createDataStore(MemStore.class, Long.class,
+        MetricDatum.class, new Configuration());
+    dataStore.put(123L, MetricDatum.newBuilder().setMetric(700).build());
+    dataStore.put(456L, MetricDatum.newBuilder().setMetric(500).build());
+    SourceTarget<Pair<Long, MetricDatum>> sourceTarget = AtGora.store(dataStore, Long.MIN_VALUE, Long.MAX_VALUE,
+        Avros.pairs(Avros.longs(), Avros.records(MetricDatum.class)));
+    Pipeline pipeline = MemPipeline.getInstance();
+    PCollection<Pair<Long, MetricDatum>> res = pipeline.read(sourceTarget);
+    PCollection<Pair<Long, MetricDatum>> metrics = res.parallelDo(new ModifMetric(),
+        Avros.pairs(Avros.longs(), Avros.records(MetricDatum.class)));
+    pipeline.write(metrics, sourceTarget);
+    pipeline.done();
+    System.out.println("length: " + res.length());
+    System.out.println("Done testGoraTarget2**************************************\n");
+  }
+
+  public static class ModifMetric extends DoFn<Pair<Long, MetricDatum>, Pair<Long, MetricDatum>> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void process(Pair<Long, MetricDatum> input, Emitter<Pair<Long, MetricDatum>> emitter) {
+      Long newMetric = input.second().getMetric() * 3;
+      input.second().setMetric(newMetric);
+      emitter.emit(input);
+    }
+  }
+}
diff --git a/crunch-gora/src/it/resources/log4j.properties b/crunch-gora/src/it/resources/log4j.properties
new file mode 100644
index 0000000..5d144a0
--- /dev/null
+++ b/crunch-gora/src/it/resources/log4j.properties
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# ***** Set root logger level to INFO and its only appender to A.
+log4j.logger.org.apache.crunch=info, A
+
+# Log warnings on Hadoop for the local runner when testing
+log4j.logger.org.apache.hadoop=warn, A
+# Except for Configuration, which is chatty.
+log4j.logger.org.apache.hadoop.conf.Configuration=error, A
+
+# ***** A is set to be a ConsoleAppender.
+log4j.appender.A=org.apache.log4j.ConsoleAppender
+# ***** A uses PatternLayout.
+log4j.appender.A.layout=org.apache.log4j.PatternLayout
+log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/crunch-gora/src/main/java/org/apache/crunch/io/gora/AtGora.java b/crunch-gora/src/main/java/org/apache/crunch/io/gora/AtGora.java
new file mode 100644
index 0000000..933ac3b
--- /dev/null
+++ b/crunch-gora/src/main/java/org/apache/crunch/io/gora/AtGora.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.gora;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.types.PType;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.store.DataStore;
+
+/**
+ * Static factory methods for creating Gora {@link SourceTarget} types.
+ */
+public class AtGora {
+
+  public static <K, V extends Persistent> GoraSourceTarget<K, V> store(DataStore<K, V> dataStore, K startKey, K endKey,
+      PType<Pair<K, V>> ptype) {
+    return new GoraSourceTarget<K, V>(dataStore, startKey, endKey, ptype);
+  }
+
+}
diff --git a/crunch-gora/src/main/java/org/apache/crunch/io/gora/FromGora.java b/crunch-gora/src/main/java/org/apache/crunch/io/gora/FromGora.java
new file mode 100644
index 0000000..27c3f04
--- /dev/null
+++ b/crunch-gora/src/main/java/org/apache/crunch/io/gora/FromGora.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.gora;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.Source;
+import org.apache.crunch.types.PType;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.store.DataStore;
+
+/**
+ * Static factory methods for creating Gora {@link Source} types.
+ */
+public class FromGora {
+
+  public static <K, V extends Persistent> Source<Pair<K, V>> store(DataStore<K, V> dataStore, K startKey, K endKey,
+      PType<Pair<K, V>> ptype) {
+    return new GoraSourceTarget<K, V>(dataStore, startKey, endKey, ptype);
+  }
+}
diff --git a/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraData.java b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraData.java
new file mode 100644
index 0000000..7b70452
--- /dev/null
+++ b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraData.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.gora;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.SourceTarget;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.store.DataStore;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import com.google.common.collect.ImmutableSet;
+
+public class GoraData<K, V extends Persistent> implements ReadableData<Pair<K, V>> {
+
+  private DataStore<K, V> dataStore;
+  private K startKey;
+  private K endKey;
+  private transient SourceTarget<?> parent;
+
+  public GoraData(DataStore<K, V> dataStore, K startKey, K endKey, SourceTarget<?> parent) {
+    this.dataStore = dataStore;
+    this.startKey = startKey;
+    this.endKey = endKey;
+    this.parent = parent;
+  }
+
+  @Override
+  public Set<SourceTarget<?>> getSourceTargets() {
+    if (parent != null) {
+      return ImmutableSet.<SourceTarget<?>> of(parent);
+    } else {
+      return ImmutableSet.of();
+    }
+  }
+
+  @Override
+  public void configure(Configuration conf) {
+    // No-op
+  }
+
+  @Override
+  public Iterable<Pair<K, V>> read(TaskInputOutputContext<?, ?, ?, ?> ctxt) throws IOException {
+    return new GoraIterable<K, V>(dataStore, startKey, endKey);
+  }
+}
diff --git a/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraIterable.java b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraIterable.java
new file mode 100644
index 0000000..36aa28a
--- /dev/null
+++ b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraIterable.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.gora;
+
+import java.util.Iterator;
+
+import org.apache.crunch.Pair;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStore;
+
+class GoraIterable<K, V extends Persistent> implements Iterable<Pair<K, V>> {
+
+  private DataStore<K, V> dataStore;
+  private K startKey;
+  private K endKey;
+
+  public GoraIterable(DataStore<K, V> dataStore, K startKey, K endKey) {
+    this.dataStore = dataStore;
+    this.startKey = startKey;
+    this.endKey = endKey;
+  }
+
+  @Override
+  public Iterator<Pair<K, V>> iterator() {
+    Query<K, V> query = dataStore.newQuery();
+    query.setKeyRange(startKey, endKey);
+    Result<K, V> result = query.execute();
+    return new GoraIterator<K, V>(result);
+  }
+}
diff --git a/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraIterator.java b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraIterator.java
new file mode 100644
index 0000000..ef6b26f
--- /dev/null
+++ b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraIterator.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.gora;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.Pair;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Result;
+
+class GoraIterator<K, V extends Persistent> implements Iterator<Pair<K, V>> {
+
+  private Result<K, V> result;
+  private boolean hasNext;
+
+  public GoraIterator(Result<K, V> result) {
+    this.result = result;
+    try {
+      hasNext = result.next();
+    } catch (IOException e) {
+      throw new CrunchRuntimeException(e);
+    } catch (Exception e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    return hasNext;
+  }
+
+  @Override
+  public Pair<K, V> next() {
+    if (!hasNext())
+      throw new NoSuchElementException("no more element to iterate");
+    Pair<K, V> next = Pair.of(result.getKey(), result.get());
+    try {
+      hasNext = result.next();
+    } catch (IOException e) {
+      throw new CrunchRuntimeException(e);
+    } catch (Exception e) {
+      throw new CrunchRuntimeException(e);
+    }// System.out.println("GoraIterator:next(), key: " + result.getKey());
+    return next;
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraPairConverter.java b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraPairConverter.java
new file mode 100644
index 0000000..a92a17c
--- /dev/null
+++ b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraPairConverter.java
@@ -0,0 +1,69 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.crunch.io.gora;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.Converter;
+
+class GoraPairConverter<K, V> implements Converter<K, V, Pair<K, V>, Pair<K, Iterable<V>>> {
+
+  private Class<K> keyClass;
+  private Class<V> valueClass;
+
+  public GoraPairConverter(Class<K> keyClass, Class<V> valueClass) {
+    this.keyClass = keyClass;
+    this.valueClass = valueClass;
+  }
+
+  @Override
+  public Pair<K, V> convertInput(K key, V value) {
+    return Pair.of(key, value);
+  }
+
+  @Override
+  public K outputKey(Pair<K, V> value) {
+    return value.first();
+  }
+
+  @Override
+  public V outputValue(Pair<K, V> value) {
+    return value.second();
+  }
+
+  @Override
+  public Class<K> getKeyClass() {
+    return keyClass;
+  }
+
+  @Override
+  public Class<V> getValueClass() {
+    return valueClass;
+  }
+
+  @Override
+  public boolean applyPTypeTransforms() {
+    return false;
+  }
+
+  @Override
+  public Pair<K, Iterable<V>> convertIterableInput(K key, Iterable<V> value) {
+    return Pair.of(key, value);
+  }
+}
diff --git a/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraSourceTarget.java b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraSourceTarget.java
new file mode 100644
index 0000000..e18b8b8
--- /dev/null
+++ b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraSourceTarget.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.gora;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.Source;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PType;
+import org.apache.gora.mapreduce.GoraInputFormat;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
+import org.apache.gora.store.DataStore;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GoraSourceTarget<K, V extends Persistent> extends GoraTarget<K, V> implements
+    ReadableSourceTarget<Pair<K, V>> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GoraSourceTarget.class);
+
+  private K startKey;
+  private K endKey;
+  private PType<Pair<K, V>> ptype;
+
+  public GoraSourceTarget(DataStore<K, V> dataStore, K startKey, K endKey, PType<Pair<K, V>> ptype) {
+    super(dataStore);
+    this.startKey = startKey;
+    this.endKey = endKey;
+    this.ptype = ptype;
+  }
+
+  @Override
+  public Source<Pair<K, V>> inputConf(String key, String value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public PType<Pair<K, V>> getType() {
+    return ptype;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !(other instanceof GoraSourceTarget)) {
+      return false;
+    }
+    GoraSourceTarget<?, ?> o = (GoraSourceTarget<?, ?>) other;
+    if (!super.equals(o))
+      return false;
+    if (!ptype.equals(o.ptype))
+      return false;
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    return hcb.append(dataStore).append(ptype).toHashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "GoraDataStore(" + dataStore + ")";
+  }
+
+  @Override
+  public void configureSource(Job job, int inputId) throws IOException {
+    Query<K, V> query = dataStore.newQuery();
+    GoraInputFormat.setInput(job, query, true);
+  }
+
+  @Override
+  public long getSize(Configuration conf) {
+    // TODO something smarter here.
+    return 1000L * 1000L * 1000L;
+  }
+
+  @Override
+  public long getLastModifiedAt(Configuration configuration) {
+    LOG.warn("Cannot determine last modified time for source: {}", toString());
+    return -1;
+  }
+
+  @Override
+  public Converter<?, ?, ?, ?> getConverter() {
+    return new GoraPairConverter<K, V>(dataStore.getKeyClass(), dataStore.getPersistentClass());
+  }
+
+  @Override
+  public Iterable<Pair<K, V>> read(Configuration conf) throws IOException {
+    return new GoraIterable<K, V>(dataStore, startKey, endKey);
+  }
+
+  @Override
+  public ReadableData<Pair<K, V>> asReadable() {
+    return new GoraData<K, V>(dataStore, startKey, endKey, this);
+  }
+
+  @Override
+  public SourceTarget<Pair<K, V>> conf(String key, String value) {
+    inputConf(key, value);
+    outputConf(key, value);
+    return this;
+  }
+}
diff --git a/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraTarget.java b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraTarget.java
new file mode 100644
index 0000000..96bfcba
--- /dev/null
+++ b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraTarget.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.gora;
+
+import java.util.Map;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
+import org.apache.crunch.io.MapReduceTarget;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PType;
+import org.apache.gora.mapreduce.GoraOutputFormat;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.store.DataStore;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+public class GoraTarget<K, V extends Persistent> implements MapReduceTarget {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GoraTarget.class);
+
+  protected DataStore<K, V> dataStore;
+  private Map<String, String> extraConf = Maps.newHashMap();
+
+  public GoraTarget(DataStore<K, V> dataStore) {
+    this.dataStore = dataStore;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other)
+      return true;
+    if (other == null)
+      return false;
+    if (!other.getClass().equals(getClass()))
+      return false;
+    GoraTarget<?, ?> o = (GoraTarget<?, ?>) other;
+    return dataStore.equals(o.dataStore);
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    return hcb.append(dataStore).toHashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "GoraDataStore(" + dataStore + ")";
+  }
+
+  @Override
+  public boolean accept(OutputHandler handler, PType<?> ptype) {
+    handler.configure(this, ptype);
+    return true;
+  }
+
+  @Override
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
+    GoraOutputFormat.setOutput(job, dataStore, true);
+    final Configuration conf = job.getConfiguration();
+    for (Map.Entry<String, String> e : extraConf.entrySet()) {
+      conf.set(e.getKey(), e.getValue());
+    }
+  }
+
+  @Override
+  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+    return null;
+  }
+
+  @Override
+  public Target outputConf(String key, String value) {
+    extraConf.put(key, value);
+    return this;
+  }
+
+  @Override
+  public boolean handleExisting(WriteMode strategy, long lastModifiedAt, Configuration conf) {
+    LOG.info("GoraTarget ignores checks for existing outputs...");
+    return false;
+  }
+
+  @Override
+  public Converter<?, ?, ?, ?> getConverter(final PType<?> ptype) {
+    return new GoraPairConverter<K, V>(dataStore.getKeyClass(), dataStore.getPersistentClass());
+  }
+}
diff --git a/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraValueConverter.java b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraValueConverter.java
new file mode 100644
index 0000000..2f9d709
--- /dev/null
+++ b/crunch-gora/src/main/java/org/apache/crunch/io/gora/GoraValueConverter.java
@@ -0,0 +1,68 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.crunch.io.gora;
+
+import org.apache.crunch.types.Converter;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.util.Null;
+
+public class GoraValueConverter<V extends Persistent> implements Converter<Object, V, V, Iterable<V>> {
+
+  private final Class<V> serializationClass;
+
+  public GoraValueConverter(Class<V> serializationClass) {
+    this.serializationClass = serializationClass;
+  }
+
+  @Override
+  public V convertInput(Object key, V value) {
+    return value;
+  }
+
+  @Override
+  public Object outputKey(V value) {
+    return Null.get();
+  }
+
+  @Override
+  public V outputValue(V value) {
+    return value;
+  }
+
+  @Override
+  public Class<Object> getKeyClass() {
+    return (Class<Object>) (Class<?>) Null.class;
+  }
+
+  @Override
+  public Class<V> getValueClass() {
+    return serializationClass;
+  }
+
+  @Override
+  public boolean applyPTypeTransforms() {
+    return false;
+  }
+
+  @Override
+  public Iterable<V> convertIterableInput(Object key, Iterable<V> value) {
+    return value;
+  }
+}
diff --git a/crunch-gora/src/main/java/org/apache/crunch/io/gora/ToGora.java b/crunch-gora/src/main/java/org/apache/crunch/io/gora/ToGora.java
new file mode 100644
index 0000000..345d7ec
--- /dev/null
+++ b/crunch-gora/src/main/java/org/apache/crunch/io/gora/ToGora.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.gora;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.Target;
+import org.apache.crunch.types.PType;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.store.DataStore;
+
+/**
+ * Static factory methods for creating HBase {@link Target} types.
+ */
+public class ToGora {
+
+  public static <K, V extends Persistent> Target store(DataStore<K, V> dataStore, PType<Pair<K, V>> ptype) {
+    return new GoraTarget<K, V>(dataStore);
+  }
+
+}
diff --git a/pom.xml b/pom.xml
index d47f6ff..22a1b5c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,7 @@ under the License.
   <modules>
     <module>crunch-core</module>
     <module>crunch-hbase</module>
+    <module>crunch-gora</module>
     <module>crunch-test</module>
     <module>crunch-contrib</module>
     <module>crunch-examples</module>
-- 
2.1.4

Reply via email to