Repository: crunch Updated Branches: refs/heads/master b0491f20a -> 157ae25b4
CRUNCH-611: Added API for Offset reading/writing along with a simple implementation that supports doing it from hdfs. Signed-off-by: Micah Whitacre <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/157ae25b Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/157ae25b Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/157ae25b Branch: refs/heads/master Commit: 157ae25b48ba5fb46001df7bb05c753a188ade8d Parents: b0491f2 Author: Micah Whitacre <[email protected]> Authored: Wed Jul 13 10:18:17 2016 -0500 Committer: Micah Whitacre <[email protected]> Committed: Sat Jul 30 15:42:40 2016 -0500 ---------------------------------------------------------------------- crunch-kafka/pom.xml | 19 + .../kafka/offset/AbstractOffsetReader.java | 24 ++ .../kafka/offset/AbstractOffsetWriter.java | 17 + .../crunch/kafka/offset/OffsetReader.java | 44 ++ .../crunch/kafka/offset/OffsetWriter.java | 34 ++ .../kafka/offset/hdfs/HDFSOffsetReader.java | 142 +++++++ .../kafka/offset/hdfs/HDFSOffsetWriter.java | 165 ++++++++ .../crunch/kafka/offset/hdfs/Offsets.java | 316 ++++++++++++++ .../kafka/offset/hdfs/HDFSOffsetReaderTest.java | 220 ++++++++++ .../kafka/offset/hdfs/HDFSOffsetWriterTest.java | 185 ++++++++ .../crunch/kafka/offset/hdfs/OffsetsTest.java | 418 +++++++++++++++++++ pom.xml | 19 + 12 files changed, 1603 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-kafka/pom.xml b/crunch-kafka/pom.xml index 7d6256b..dddf859 100644 --- a/crunch-kafka/pom.xml +++ b/crunch-kafka/pom.xml @@ -52,6 +52,25 @@ under the License. <artifactId>hadoop-client</artifactId> </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <scope>compile</scope> + <optional>true</optional> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <scope>compile</scope> + <optional>true</optional> + </dependency> + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + <scope>compile</scope> + <optional>true</optional> + </dependency> + <dependency> <groupId>org.slf4j</groupId> http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/AbstractOffsetReader.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/AbstractOffsetReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/AbstractOffsetReader.java new file mode 100644 index 0000000..0856c64 --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/AbstractOffsetReader.java @@ -0,0 +1,24 @@ +package org.apache.crunch.kafka.offset; + +import org.apache.kafka.common.TopicPartition; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Base implementation of {@link OffsetReader} + */ +public abstract class AbstractOffsetReader implements OffsetReader { + + @Override + public Map<TopicPartition, Long> readOffsets(long persistedOffsetTime) throws IOException { + throw new UnsupportedOperationException("Operation to read old offsets is not supported"); + } + + @Override + public List<Long> getStoredOffsetPersistenceTimes() throws IOException { + throw new UnsupportedOperationException("Operation to retrieve old offset persistence times is not supported"); + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/AbstractOffsetWriter.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/AbstractOffsetWriter.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/AbstractOffsetWriter.java new file mode 100644 index 0000000..493a499 --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/AbstractOffsetWriter.java @@ -0,0 +1,17 @@ +package org.apache.crunch.kafka.offset; + +import org.apache.kafka.common.TopicPartition; + +import java.io.IOException; +import java.util.Map; + +/** + * Base implementation of {@link OffsetWriter} + */ +public abstract class AbstractOffsetWriter implements OffsetWriter { + + @Override + public void write(Map<TopicPartition, Long> offsets) throws IOException { + write(System.currentTimeMillis(), offsets); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/OffsetReader.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/OffsetReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/OffsetReader.java new file mode 100644 index 0000000..4d4056b --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/OffsetReader.java @@ -0,0 +1,44 @@ +package org.apache.crunch.kafka.offset; + +import org.apache.kafka.common.TopicPartition; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Reader API that supports reading offset information from an underlying storage mechanism. + */ +public interface OffsetReader extends Closeable { + + /** + * Reads the last stored offsets. + * + * @return the last stored offsets. If there are no stored offsets an empty collection will be returned. + * @throws IOException if there is an error reading from the underlying storage. + */ + Map<TopicPartition, Long> readLatestOffsets() throws IOException; + + /** + * Reads the offsets for a given {@code persistedOffsetTime}. Note that not all storage mechanisms support + * complete historical offset information. Use the {@link #getStoredOffsetPersistenceTimes()} to find valid values + * to specify for {@code persistedOffsetTime}. + * + * @param persistedOffsetTime the persistence time when offsets were written to the underlying storage system. + * @return returns the offsets persisted at the specified {@code persistedOffsetTime}. If no offsets were persisted + * at that time or available to be retrieved then {@code null} will be returned. + * @throws IOException if there is an error reading from the underlying storage. + */ + Map<TopicPartition, Long> readOffsets(long persistedOffsetTime) throws IOException; + + /** + * Returns the list of available persistence times offsets have been written to the underlying storage mechanism. + * The list of available persistence times will be returned in the order of earliest to latest. + * + * @return the collection of persistence times in the form of milliseconds since epoch. If there are no historical + * persistence times then an {@code empty list} is returned. + * @throws IOException if there is an error reading from the underlying storage. + */ + public List<Long> getStoredOffsetPersistenceTimes() throws IOException; +} http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/OffsetWriter.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/OffsetWriter.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/OffsetWriter.java new file mode 100644 index 0000000..0f0056e --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/OffsetWriter.java @@ -0,0 +1,34 @@ +package org.apache.crunch.kafka.offset; + +import org.apache.kafka.common.TopicPartition; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; + +/** + * Writer for persisting offset information. + */ +public interface OffsetWriter extends Closeable { + + /** + * Persists the {@code offsets} to a configured location with the current time specified as the as of time. + * + * @param offsets the offsets to persist + * @throws IllegalArgumentException if the {@code offsets} are {@code null}. + * @throws IOException if there is an error persisting the offsets. + */ + void write(Map<TopicPartition, Long> offsets) throws IOException; + + /** + * Persists the {@code offsets} to a configured location with metadata of {@code asOfTime} indicating + * the time in milliseconds when the offsets were meaningful. + * + * @param asOfTime the metadata describing when the offsets are accurate as of a time given in milliseconds + * since epoch. + * @param offsets the offsets to persist + * @throws IllegalArgumentException if the {@code offsets} are {@code null} or the {@code asOfTime} is less than 0. + * @throws IOException if there is an error persisting the offsets. + */ + void write(long asOfTime, Map<TopicPartition, Long> offsets) throws IOException; +} http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetReader.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetReader.java new file mode 100644 index 0000000..a497570 --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetReader.java @@ -0,0 +1,142 @@ +package org.apache.crunch.kafka.offset.hdfs; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.crunch.kafka.offset.AbstractOffsetReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Reader implementation that reads offset information from HDFS. + */ +public class HDFSOffsetReader extends AbstractOffsetReader { + + private static final Logger LOG = LoggerFactory.getLogger(HDFSOffsetReader.class); + + private final Configuration config; + private final Path baseOffsetStoragePath; + private static final ObjectMapper MAPPER = new ObjectMapper(); + + /** + * Creates a reader instance for interacting with the storage specified by the {@code config} and with + * the base storage path of {@code baseStoragePath}. + * + * @param config the config for interacting with the underlying data store. + * @param baseOffsetStoragePath the base storage path for offset information. If the path does not exist it will + * be created. + * @throws IllegalArgumentException if either argument is {@code null}. + */ + public HDFSOffsetReader(Configuration config, Path baseOffsetStoragePath) { + if (config == null) { + throw new IllegalArgumentException("The 'config' cannot be 'null'."); + } + if (baseOffsetStoragePath == null) { + throw new IllegalArgumentException("The 'baseOffsetStoragePath' cannot be 'null'."); + } + this.config = config; + this.baseOffsetStoragePath = baseOffsetStoragePath; + } + + @Override + public Map<TopicPartition, Long> readLatestOffsets() throws IOException { + List<Long> storedOffsetPersistenceTimes = getStoredOffsetPersistenceTimes(true); + if (storedOffsetPersistenceTimes.isEmpty()) { + return Collections.emptyMap(); + } + + long persistedTime = storedOffsetPersistenceTimes.get(0); + + Map<TopicPartition, Long> offsets = readOffsets(persistedTime); + + return offsets == null ? Collections.<TopicPartition, Long>emptyMap() : offsets; + } + + @Override + public Map<TopicPartition, Long> readOffsets(long persistedOffsetTime) throws IOException { + Path offsetFilePath = HDFSOffsetWriter.getPersistedTimeStoragePath(baseOffsetStoragePath, persistedOffsetTime); + + FileSystem fs = getFileSystem(); + if (fs.isFile(offsetFilePath)) { + InputStream inputStream = fs.open(offsetFilePath); + try { + Offsets offsets = MAPPER.readValue(inputStream, Offsets.class); + Map<TopicPartition, Long> partitionsMap = new HashMap<>(); + for(Offsets.PartitionOffset partitionOffset: offsets.getOffsets()){ + partitionsMap.put(new TopicPartition(partitionOffset.getTopic(), partitionOffset.getPartition()), + partitionOffset.getOffset()); + } + return partitionsMap; + }finally{ + inputStream.close(); + } + } + + LOG.error("Offset file at {} is not a file or does not exist.", offsetFilePath); + return null; + } + + @Override + public List<Long> getStoredOffsetPersistenceTimes() throws IOException { + return getStoredOffsetPersistenceTimes(false); + } + + private List<Long> getStoredOffsetPersistenceTimes(boolean newestFirst) throws IOException { + List<Long> persistedTimes = new LinkedList<>(); + FileSystem fs = getFileSystem(); + try { + FileStatus[] fileStatuses = fs.listStatus(baseOffsetStoragePath); + for (FileStatus status : fileStatuses) { + if (status.isFile()) { + String fileName = status.getPath().getName(); + try { + persistedTimes.add(HDFSOffsetWriter.fileNameToPersistenceTime(fileName)); + } catch (IllegalArgumentException iae) { + LOG.info("Skipping file {} due to filename not being of the correct format.", status.getPath(), + iae); + } + } else { + LOG.info("Skippping {} because it is not a file.", status.getPath()); + } + } + } catch (FileNotFoundException fnfe) { + LOG.error("Unable to retrieve prior offsets.", fnfe); + } + + //natural order should put oldest (smallest long) first. This will put newest first. + if (newestFirst) { + Collections.sort(persistedTimes, Collections.reverseOrder()); + } else { + Collections.sort(persistedTimes); + } + return Collections.unmodifiableList(persistedTimes); + } + + @Override + public void close() throws IOException { + + } + + /** + * Returns the {@link FileSystem} instance for writing data. Callers are not responsible for closing the instance. + * + * @return the {@link FileSystem} instance for writing data. + * @throws IOException error retrieving underlying file system. + */ + protected FileSystem getFileSystem() throws IOException { + return FileSystem.get(config); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetWriter.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetWriter.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetWriter.java new file mode 100644 index 0000000..9762d1d --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetWriter.java @@ -0,0 +1,165 @@ +package org.apache.crunch.kafka.offset.hdfs; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang.StringUtils; +import org.apache.crunch.kafka.offset.AbstractOffsetWriter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kafka.common.TopicPartition; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Offset writer implementation that stores the offsets in HDFS. + */ +public class HDFSOffsetWriter extends AbstractOffsetWriter { + + private static final Logger LOG = LoggerFactory.getLogger(HDFSOffsetWriter.class); + + /** + * Custom formatter for translating the times into valid file names. + */ + public static final String PERSIST_TIME_FORMAT = "yyyy-MM-dd'T'HH-mm-ssZ"; + + /** + * Formatter to use when creating the file names in a URI compliant format. + */ + public static final DateTimeFormatter FILE_FORMATTER = DateTimeFormat.forPattern(PERSIST_TIME_FORMAT).withZoneUTC(); + + /** + * File extension for storing the offsets. + */ + public static final String FILE_FORMAT_EXTENSION = ".json"; + + /** + * Configuration for the underlying storage. + */ + private final Configuration config; + + /** + * Mapper for converting data into JSON + */ + private static final ObjectMapper MAPPER = new ObjectMapper(); + + /** + * Base storage path for offset data + */ + private final Path baseStoragePath; + + /** + * Creates a writer instance for interacting with the storage specified by the {@code config} and with + * the base storage path of {@code baseStoragePath}. + * + * @param config the config for interacting with the underlying data store. + * @param baseStoragePath the base storage path for offset information. + * @throws IllegalArgumentException if either argument is {@code null}. + */ + public HDFSOffsetWriter(Configuration config, Path baseStoragePath) { + if (config == null) { + throw new IllegalArgumentException("The 'config' cannot be 'null'."); + } + if (baseStoragePath == null) { + throw new IllegalArgumentException("The 'baseStoragePath' cannot be 'null'."); + } + this.config = config; + this.baseStoragePath = baseStoragePath; + } + + @Override + public void write(long asOfTime, Map<TopicPartition, Long> offsets) throws IOException { + if (offsets == null) { + throw new IllegalArgumentException("The 'offsets' cannot be 'null'."); + } + if (asOfTime < 0) { + throw new IllegalArgumentException("The 'asOfTime' cannot be less than 0."); + } + List<Offsets.PartitionOffset> partitionOffsets = new LinkedList<>(); + for(Map.Entry<TopicPartition, Long> entry: offsets.entrySet()){ + partitionOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setOffset(entry.getValue()) + .setTopic(entry.getKey().topic()) + .setPartition(entry.getKey().partition()).build()); + } + + Offsets storageOffsets = Offsets.Builder.newBuilder().setOffsets(partitionOffsets) + .setAsOfTime(asOfTime).build(); + + FileSystem fs = getFileSystem(); + Path offsetPath = getPersistedTimeStoragePath(baseStoragePath, asOfTime); + LOG.debug("Writing offsets to {} with as of time {}", offsetPath, asOfTime); + try (FSDataOutputStream fsDataOutputStream = fs.create(getPersistedTimeStoragePath(baseStoragePath, asOfTime), true)) { + MAPPER.writeValue(fsDataOutputStream, storageOffsets); + fsDataOutputStream.flush(); + } + LOG.debug("Completed writing offsets to {}", offsetPath); + } + + @Override + public void close() throws IOException { + //no-op + } + + /** + * Returns the {@link FileSystem} instance for writing data. Callers are not responsible for closing the instance. + * + * @return the {@link FileSystem} instance for writing data. + * @throws IOException error retrieving underlying file system. + */ + protected FileSystem getFileSystem() throws IOException { + return FileSystem.get(config); + } + + /** + * Creates a {@link Path} for storing the offsets for a specified {@code persistedTime}. + * + * @param baseStoragePath The base path the offsets will be stored at. + * @param persistedTime the time of the data being persisted. + * @return The path to where the offset information should be stored. + * @throws IllegalArgumentException if the {@code baseStoragePath} is {@code null}. + */ + public static Path getPersistedTimeStoragePath(Path baseStoragePath, long persistedTime) { + if (baseStoragePath == null) { + throw new IllegalArgumentException("The 'baseStoragePath' cannot be 'null'."); + } + return new Path(baseStoragePath, persistenceTimeToFileName(persistedTime)); + } + + /** + * Converts a {@code fileName} into the time the offsets were persisted. + * + * @param fileName the file name to parse. + * @return the time in milliseconds since epoch that the offsets were stored. + * @throws IllegalArgumentException if the {@code fileName} is not of the correct format or is {@code null} or + * empty. + */ + public static long fileNameToPersistenceTime(String fileName) { + if (StringUtils.isBlank(fileName)) { + throw new IllegalArgumentException("the 'fileName' cannot be 'null' or empty"); + } + String formattedTimeString = StringUtils.strip(fileName, FILE_FORMAT_EXTENSION); + DateTime persistedTime = FILE_FORMATTER.parseDateTime(formattedTimeString); + return persistedTime.getMillis(); + } + + /** + * Converts a {@code persistedTime} into a file name for persisting the offsets. + * + * @param persistedTime the persisted time to use to generate the file name. + * @return the file name to use when persisting the data. + */ + public static String persistenceTimeToFileName(long persistedTime) { + DateTime dateTime = new DateTime(persistedTime, DateTimeZone.UTC); + String formattedTime = FILE_FORMATTER.print(dateTime); + return formattedTime + FILE_FORMAT_EXTENSION; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/Offsets.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/Offsets.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/Offsets.java new file mode 100644 index 0000000..e5c80e0 --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/Offsets.java @@ -0,0 +1,316 @@ +package org.apache.crunch.kafka.offset.hdfs; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import kafka.api.OffsetRequest; +import org.apache.commons.lang.StringUtils; + +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Simple object to represent a collection of Kafka Topic and Partition offset information to make storing + * this information easier. + */ +@JsonDeserialize(builder = Offsets.Builder.class) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class Offsets { + + private final long offsetsAsOfTime; + + private final List<PartitionOffset> offsets; + + private Offsets(long asOfTime, List<PartitionOffset> offsets) { + offsetsAsOfTime = asOfTime; + this.offsets = offsets; + } + + /** + * Returns the time in milliseconds since epoch that the offset information was retrieved or valid as of. + * + * @return the time in milliseconds since epoch that the offset information was retrieved or valid as of. + */ + @JsonProperty("asOfTime") + public long getAsOfTime() { + return offsetsAsOfTime; + } + + /** + * The collection of offset information for specific topics and partitions. + * + * @return collection of offset information for specific topics and partitions. + */ + @JsonProperty("offsets") + public List<PartitionOffset> getOffsets() { + return offsets; + } + + @Override + public int hashCode() { + return Objects.hash(offsetsAsOfTime, offsets); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (obj instanceof Offsets) { + Offsets that = (Offsets) obj; + + return this.offsetsAsOfTime == that.offsetsAsOfTime + && this.offsets.equals(that.offsets); + } + + return false; + } + + + /** + * Builder for the {@link Offsets}. + */ + @JsonIgnoreProperties(ignoreUnknown = true) + public static class Builder { + + private long asOf = -1; + private List<PartitionOffset> offsets = Collections.emptyList(); + + /** + * Creates a new Builder instance. + * + * @return a new Builder instance. + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Sets the as of time for the collection of offsets. + * + * @param asOfTime the as of time for the offsets. + * @return builder instance + * @throws IllegalArgumentException if the {@code asOfTime} is less than 0. + */ + @JsonProperty("asOfTime") + public Builder setAsOfTime(long asOfTime) { + if (asOfTime < 0) { + throw new IllegalArgumentException("The 'asOfTime' cannot be less than 0."); + } + this.asOf = asOfTime; + return this; + } + + /** + * Sets the collection of offsets. + * + * @param offsets the collection of offsets + * @return builder instance + * @throws IllegalArgumentException if the {@code offsets} is {@code null}. + */ + @JsonProperty("offsets") + public Builder setOffsets(List<PartitionOffset> offsets) { + if (offsets == null) { + throw new IllegalArgumentException("The 'offsets' cannot be 'null'."); + } + List<PartitionOffset> sortedOffsets = new LinkedList<>(offsets); + + Collections.sort(sortedOffsets); + + this.offsets = Collections.unmodifiableList(sortedOffsets); + + return this; + } + + /** + * Builds an instance. + * + * @return a built instance + * @throws IllegalStateException if the {@link #setAsOfTime(long) asOfTime} is not set or the specified + * {@link #setOffsets(List) offsets} contains duplicate entries for a topic partition. + */ + public Offsets build() { + if (asOf < 0) { + throw new IllegalStateException("The 'asOfTime' cannot be less than 0."); + } + + Set<String> uniqueTopicPartitions = new HashSet<>(); + for(PartitionOffset partitionOffset : offsets){ + uniqueTopicPartitions.add(partitionOffset.getTopic()+partitionOffset.getPartition()); + } + + if (uniqueTopicPartitions.size() != offsets.size()) { + throw new IllegalStateException("The 'offsets' contains duplicate entries for a topic and partition."); + } + + return new Offsets(asOf, offsets); + } + } + + + /** + * Simple object that represents a specific topic, partition, and its offset value. + */ + @JsonDeserialize(builder = PartitionOffset.Builder.class) + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class PartitionOffset implements Comparable<PartitionOffset> { + + private final String topic; + private final int partition; + private final long offset; + + private PartitionOffset(String topic, int partition, long offset) { + this.topic = topic; + this.partition = partition; + this.offset = offset; + } + + /** + * Returns the topic + * + * @return the topic + */ + public String getTopic() { + return topic; + } + + /** + * Returns the partition + * + * @return the partition + */ + public int getPartition() { + return partition; + } + + /** + * Returns the offset + * + * @return the offset + */ + public long getOffset() { + return offset; + } + + @Override + public int compareTo(PartitionOffset other) { + int compare = topic.compareTo(other.topic); + if (compare == 0) { + compare = Integer.compare(partition, other.partition); + if (compare == 0) { + return Long.compare(offset, other.offset); + } + } + return compare; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (obj instanceof PartitionOffset) { + PartitionOffset that = (PartitionOffset) obj; + + return compareTo(that) == 0; + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(topic, partition, offset); + } + + /** + * Builder for {@link PartitionOffset} + */ + @JsonIgnoreProperties(ignoreUnknown = true) + public static class Builder { + + private String topic; + private int partition = -1; + private long offset = OffsetRequest.EarliestTime(); + + /** + * Creates a new builder instance. + * + * @return a new builder instance. + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Set the {@code topic} for the partition offset being built + * + * @param topic the topic for the partition offset being built. + * @return builder instance + * @throws IllegalArgumentException if the {@code topic} is {@code null} or empty. + */ + @JsonProperty("topic") + public Builder setTopic(String topic) { + if (StringUtils.isBlank(topic)) { + throw new IllegalArgumentException("The 'topic' cannot be null or empty."); + } + this.topic = topic; + return this; + } + + /** + * Set the {@code partition} for the partition offset being built + * + * @param partition the partition for the partition offset being built. + * @return builder instance + * @throws IllegalArgumentException if the {@code partition} is less than 0. + */ + @JsonProperty("partition") + public Builder setPartition(int partition) { + if (partition < 0) { + throw new IllegalArgumentException("The 'partition' cannot be less than 0."); + } + this.partition = partition; + return this; + } + + /** + * Set the {@code offset} for the partition offset being built. If the {@code offset} is not + * set then it defaults to {@link OffsetRequest#EarliestTime()}. + * + * @param offset the topic for the partition offset being built. + * @return builder instance + */ + @JsonProperty("offset") + public Builder setOffset(long offset) { + this.offset = offset; + return this; + } + + /** + * Builds a PartitionOffset instance. + * + * @return the built PartitionOffset instance. + * @throws IllegalStateException if the {@code topic} or {@code partition} are never set or configured + * to invalid values. + */ + public PartitionOffset build() { + if (StringUtils.isBlank(topic)) { + throw new IllegalStateException("The 'topic' cannot be null or empty."); + } + + if (partition < 0) { + throw new IllegalStateException("The 'partition' cannot be less than 0."); + } + + return new PartitionOffset(topic, partition, offset); + } + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetReaderTest.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetReaderTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetReaderTest.java new file mode 100644 index 0000000..faead74 --- /dev/null +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetReaderTest.java @@ -0,0 +1,220 @@ +package org.apache.crunch.kafka.offset.hdfs; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.crunch.kafka.offset.OffsetReader; +import org.apache.crunch.kafka.offset.OffsetWriter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kafka.common.TopicPartition; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +public class HDFSOffsetReaderTest { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Rule + public TestName testName = new TestName(); + + private Path basePath; + private FileSystem fileSystem; + private OffsetWriter writer; + private OffsetReader reader; + + + @Before + public void setup() throws IOException { + Configuration config = new Configuration(); + config.set(FileSystem.DEFAULT_FS, tempFolder.newFolder().getAbsolutePath()); + + fileSystem = FileSystem.newInstance(config); + basePath = new Path(tempFolder.newFolder().toString(), testName.getMethodName()); + + writer = new HDFSOffsetWriter(config, basePath); + + reader = new HDFSOffsetReader(config, basePath); + } + + @After + public void cleanup() throws IOException { + writer.close(); + reader.close(); + fileSystem.close(); + } + + @Test(expected = IllegalArgumentException.class) + public void constructNullConfig() { + new HDFSOffsetReader(null, new Path("/")); + } + + @Test(expected = IllegalArgumentException.class) + public void constructNullPath() { + new HDFSOffsetReader(new Configuration(), null); + } + + @Test + public void getStoredOffsetPersistenceTimesNoValues() throws IOException { + List<Long> storedOffsetPersistenceTimes = reader.getStoredOffsetPersistenceTimes(); + assertThat(storedOffsetPersistenceTimes, is(Collections.<Long>emptyList())); + } + + @Test + public void getStoredOffsetPersistenceTimesMultipleValues() throws IOException { + long current = 1464992662000L; + List<Long> persistedTimes = new LinkedList<>(); + for (int i = 0; i < 10; i++) { + persistedTimes.add(current + (i * 18000)); + } + + for (Long t : persistedTimes) { + try { + writer.write(t, Collections.<TopicPartition, Long>emptyMap()); + } catch (IOException e) { + e.printStackTrace(); + } + } + + List<Long> storedTimes = reader.getStoredOffsetPersistenceTimes(); + + assertThat(storedTimes, is(persistedTimes)); + } + + @Test + public void readOffsetNoMatchForTime() throws IOException { + Map<TopicPartition, Long> offsets = reader.readOffsets(12345L); + assertThat(offsets, is(nullValue())); + } + + @Test + public void readOffsetLatestNone() throws IOException { + assertThat(reader.readLatestOffsets(), is(Collections.<TopicPartition, Long>emptyMap())); + } + + @Test + public void readOffsetLatest() throws IOException { + long current = 1464992662000L; + List<Long> persistedTimes = new LinkedList<>(); + for (int i = 0; i < 10; i++) { + persistedTimes.add(current + (i * 18000)); + } + + for (Long t : persistedTimes) { + try { + writer.write(t, Collections.<TopicPartition, Long>emptyMap()); + } catch (IOException e) { + e.printStackTrace(); + } + } + + long expectedTime = persistedTimes.get(persistedTimes.size() - 1); + + Map<TopicPartition, Long> offsets = new HashMap<>(); + for (int i = 0; i < 9; i++) { + for (int j = 0; j < 5; j++) { + offsets.put(new TopicPartition("topic" + i, j), (long) j); + } + } + + writer.write(expectedTime, offsets); + + Map<TopicPartition, Long> retrievedOffsets = reader.readLatestOffsets(); + + assertThat(retrievedOffsets, is(offsets)); + } + + + @Test + public void readOffsetForTime() throws IOException { + long current = 1464992662000L; + List<Long> persistedTimes = new LinkedList<>(); + for (int i = 0; i < 10; i++) { + persistedTimes.add(current + (i * 18000)); + } + for (Long t : persistedTimes) { + try { + writer.write(t, Collections.<TopicPartition, Long>emptyMap()); + } catch (IOException e) { + e.printStackTrace(); + } + } + + long expectedTime = persistedTimes.get(2); + + Map<TopicPartition, Long> offsets = new HashMap<>(); + for (int i = 0; i < 9; i++) { + for (int j = 0; j < 5; j++) { + offsets.put(new TopicPartition("topic" + i, j), (long) j); + } + } + + writer.write(expectedTime, offsets); + + Map<TopicPartition, Long> retrievedOffsets = reader.readOffsets(expectedTime); + + assertThat(retrievedOffsets, is(offsets)); + } + + + @Test + public void skipReadingDirectory() throws IOException { + long current = 1464992662000L; + List<Long> persistedTimes = new LinkedList<>(); + for (int i = 0; i < 10; i++) { + persistedTimes.add(current + (i * 18000)); + } + + for (Long t : persistedTimes) { + try { + writer.write(t, Collections.<TopicPartition, Long>emptyMap()); + } catch (IOException e) { + e.printStackTrace(); + } + } + fileSystem.mkdirs(new Path(basePath, "imadirectory")); + + List<Long> storedTimes = reader.getStoredOffsetPersistenceTimes(); + + assertThat(storedTimes, is(persistedTimes)); + } + + @Test + public void skipInvalidFile() throws IOException { + long current = 1464992662000L; + List<Long> persistedTimes = new LinkedList<>(); + for (int i = 0; i < 10; i++) { + persistedTimes.add(current + (i * 18000)); + } + + for (Long t : persistedTimes) { + try { + writer.write(t, Collections.<TopicPartition, Long>emptyMap()); + } catch (IOException e) { + e.printStackTrace(); + } + } + + fileSystem.createNewFile(new Path(basePath, "imabadfile.json")); + fileSystem.createNewFile(new Path(basePath, "imabadfile.txt")); + + List<Long> storedTimes = reader.getStoredOffsetPersistenceTimes(); + + assertThat(storedTimes, is(persistedTimes)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetWriterTest.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetWriterTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetWriterTest.java new file mode 100644 index 0000000..70922d8 --- /dev/null +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetWriterTest.java @@ -0,0 +1,185 @@ +package org.apache.crunch.kafka.offset.hdfs; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kafka.common.TopicPartition; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static junit.framework.TestCase.assertTrue; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +public class HDFSOffsetWriterTest { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Rule + public TestName testName = new TestName(); + + private Configuration config; + + private Path basePath; + private FileSystem fileSystem; + private HDFSOffsetWriter writer; + + + @Before + public void setup() throws IOException { + config = new Configuration(); + config.set(FileSystem.DEFAULT_FS, tempFolder.newFolder().getAbsolutePath()); + + fileSystem = FileSystem.newInstance(config); + basePath = new Path(tempFolder.newFolder().toString(), testName.getMethodName()); + + writer = new HDFSOffsetWriter(config, basePath); + } + + @After + public void cleanup() throws IOException { + writer.close(); + fileSystem.close(); + } + + @Test(expected = IllegalArgumentException.class) + public void constructNullConfig() { + new HDFSOffsetWriter(null, new Path("/")); + } + + @Test(expected = IllegalArgumentException.class) + public void constructNullPath() { + new HDFSOffsetWriter(new Configuration(), null); + } + + @Test(expected = IllegalArgumentException.class) + public void writeNullOffsets() throws IOException { + writer.write(10L, null); + } + + @Test(expected = IllegalArgumentException.class) + public void writeNullInvalidAsOfTime() throws IOException { + writer.write(-1L, Collections.<TopicPartition, Long>emptyMap()); + } + + @Test + public void writeEmptyOffsets() throws IOException { + long persistTime = System.currentTimeMillis(); + Map<TopicPartition, Long> offsets = Collections.emptyMap(); + + writer.write(persistTime, offsets); + + Path expectedPath = HDFSOffsetWriter.getPersistedTimeStoragePath(basePath, persistTime); + + try (InputStream in = fileSystem.open(expectedPath)) { + Offsets persistedOffsets = MAPPER.readValue(in, Offsets.class); + assertThat(persistedOffsets.getAsOfTime(), is(persistTime)); + assertThat(persistedOffsets.getOffsets(), is(Collections.<Offsets.PartitionOffset>emptyList())); + } + } + + @Test + public void writeOffsets() throws IOException { + long persistTime = System.currentTimeMillis(); + Map<TopicPartition, Long> offsets = new HashMap<>(); + + for (int i = 0; i < 9; i++) { + for (int j = 0; j < 5; j++) { + offsets.put(new TopicPartition("topic" + i, j), (long) j); + } + + } + + writer.write(persistTime, offsets); + + Path expectedPath = HDFSOffsetWriter.getPersistedTimeStoragePath(basePath, persistTime); + + try (InputStream in = fileSystem.open(expectedPath)) { + Offsets persistedOffsets = MAPPER.readValue(in, Offsets.class); + assertThat(persistedOffsets.getAsOfTime(), is(persistTime)); + assertThat(persistedOffsets.getOffsets().size(), is(offsets.size())); + + Iterator<Offsets.PartitionOffset> partitionOffsets = persistedOffsets.getOffsets().iterator(); + for (int i = 0; i < 9; i++) { + for (int j = 0; j < 5; j++) { + assertTrue(partitionOffsets.hasNext()); + Offsets.PartitionOffset partitionOffset = partitionOffsets.next(); + assertThat(partitionOffset.getPartition(), is(j)); + assertThat(partitionOffset.getOffset(), is((long) j)); + assertThat(partitionOffset.getTopic(), is("topic" + i)); + } + } + } + } + + @Test(expected = IllegalArgumentException.class) + public void getPersistedStoragePathNullBase() { + HDFSOffsetWriter.getPersistedTimeStoragePath(null, 10L); + } + + @Test + public void getPersistedStoragePath() { + //Timestamp of 02 Jun 2016 20:12:17 GMT + //2016-06-02T20:12:17Z + long timestamp = 1464898337000L; + + String expectedFileName = HDFSOffsetWriter.FILE_FORMATTER.print(timestamp) + + HDFSOffsetWriter.FILE_FORMAT_EXTENSION; + Path filePath = HDFSOffsetWriter.getPersistedTimeStoragePath(basePath, timestamp); + + assertThat(filePath, is(new Path(basePath, expectedFileName))); + } + + @Test + public void timeToFileName() { + //Timestamp of 02 Jun 2016 20:12:17 GMT + //2016-06-02T20:12:17Z + long timestamp = 1464898337000L; + + String expectedFileName = "2016-06-02T20-12-17+0000" + HDFSOffsetWriter.FILE_FORMAT_EXTENSION; + + assertThat(HDFSOffsetWriter.persistenceTimeToFileName(timestamp), is(expectedFileName)); + } + + @Test + public void fileNameToTime() { + //Timestamp of 02 Jun 2016 20:12:17 GMT + //2016-06-02T20:12:17Z + long timestamp = 1464898337000L; + + String expectedFileName = "2016-06-02T20-12-17+0000" + HDFSOffsetWriter.FILE_FORMAT_EXTENSION; + + assertThat(HDFSOffsetWriter.fileNameToPersistenceTime(expectedFileName), is(timestamp)); + } + + @Test(expected = IllegalArgumentException.class) + public void fileNameToTimeNullFileName() { + HDFSOffsetWriter.fileNameToPersistenceTime(null); + } + + @Test(expected = IllegalArgumentException.class) + public void fileNameToTimeEmptyFileName() { + HDFSOffsetWriter.fileNameToPersistenceTime(""); + } + + @Test(expected = IllegalArgumentException.class) + public void fileNameToTimeInvalidFileName() { + HDFSOffsetWriter.fileNameToPersistenceTime("2016-06-02T20:12:17.000Z.json"); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/OffsetsTest.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/OffsetsTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/OffsetsTest.java new file mode 100644 index 0000000..e976da8 --- /dev/null +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/OffsetsTest.java @@ -0,0 +1,418 @@ +package org.apache.crunch.kafka.offset.hdfs; + +import com.fasterxml.jackson.databind.ObjectMapper; +import kafka.api.OffsetRequest; +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +public class OffsetsTest { + + @Rule + public TestName testName = new TestName(); + + private static ObjectMapper mapper; + + @BeforeClass + public static void setup() { + mapper = new ObjectMapper(); + } + + @Test(expected = IllegalArgumentException.class) + public void buildOffsetNullOffsets() { + Offsets.Builder.newBuilder().setOffsets(null); + } + + @Test(expected = IllegalArgumentException.class) + public void buildInvalidAsOfTime() { + Offsets.Builder.newBuilder().setAsOfTime(-1); + } + + @Test(expected = IllegalStateException.class) + public void buildNoAsOfTime() { + Offsets.Builder.newBuilder().build(); + } + + @Test(expected = IllegalArgumentException.class) + public void buildPartitionNullTopic() { + Offsets.PartitionOffset.Builder.newBuilder().setTopic(null); + } + + @Test(expected = IllegalArgumentException.class) + public void buildPartitionEmptyTopic() { + Offsets.PartitionOffset.Builder.newBuilder().setTopic(" "); + } + + @Test(expected = IllegalArgumentException.class) + public void buildPartitionInvalidPartition() { + Offsets.PartitionOffset.Builder.newBuilder().setPartition(-1); + } + + @Test(expected = IllegalStateException.class) + public void buildPartitionNoTopicSet() { + Offsets.PartitionOffset.Builder.newBuilder().setPartition(10).setOffset(10L).build(); + } + + @Test(expected = IllegalStateException.class) + public void buildPartitionNoPartitionSet() { + Offsets.PartitionOffset.Builder.newBuilder().setTopic(testName.getMethodName()).setOffset(10L).build(); + } + + @Test + public void buildPartitionOffset() { + Offsets.PartitionOffset partitionOffset = Offsets.PartitionOffset.Builder.newBuilder() + .setTopic(testName.getMethodName()).setOffset(10L).setPartition(1).build(); + + assertThat(partitionOffset.getOffset(), is(10L)); + assertThat(partitionOffset.getPartition(), is(1)); + assertThat(partitionOffset.getTopic(), is(testName.getMethodName())); + } + + @Test + public void buildPartitionOffsetNoOffsetSet() { + Offsets.PartitionOffset partitionOffset = Offsets.PartitionOffset.Builder.newBuilder() + .setTopic(testName.getMethodName()).setPartition(1).build(); + + assertThat(partitionOffset.getOffset(), is(OffsetRequest.EarliestTime())); + assertThat(partitionOffset.getPartition(), is(1)); + assertThat(partitionOffset.getTopic(), is(testName.getMethodName())); + } + + @Test + public void partitionOffsetSame() { + Offsets.PartitionOffset partitionOffset = Offsets.PartitionOffset.Builder.newBuilder() + .setTopic(testName.getMethodName()).setOffset(10L).setPartition(1).build(); + + assertThat(partitionOffset.equals(partitionOffset), is(true)); + assertThat(partitionOffset.compareTo(partitionOffset), is(0)); + } + + @Test + public void partitionOffsetEqual() { + Offsets.PartitionOffset partitionOffset1 = Offsets.PartitionOffset.Builder.newBuilder() + .setTopic(testName.getMethodName()).setOffset(10L).setPartition(1).build(); + + Offsets.PartitionOffset partitionOffset2 = Offsets.PartitionOffset.Builder.newBuilder() + .setTopic(testName.getMethodName()).setOffset(10L).setPartition(1).build(); + + assertThat(partitionOffset1.equals(partitionOffset2), is(true)); + assertThat(partitionOffset1.compareTo(partitionOffset2), is(0)); + } + + @Test + public void partitionOffsetNotEqualDiffTopic() { + Offsets.PartitionOffset partitionOffset1 = Offsets.PartitionOffset.Builder.newBuilder() + .setTopic("abc").setOffset(10L).setPartition(1).build(); + + Offsets.PartitionOffset partitionOffset2 = Offsets.PartitionOffset.Builder.newBuilder() + .setTopic(testName.getMethodName()).setOffset(10L).setPartition(1).build(); + + assertThat(partitionOffset1.equals(partitionOffset2), is(false)); + assertThat(partitionOffset1.compareTo(partitionOffset2), is(lessThan(0))); + } + + @Test + public void partitionOffsetNotEqualDiffPartition() { + Offsets.PartitionOffset partitionOffset1 = Offsets.PartitionOffset.Builder.newBuilder() + .setTopic(testName.getMethodName()).setOffset(10L).setPartition(0).build(); + + Offsets.PartitionOffset partitionOffset2 = Offsets.PartitionOffset.Builder.newBuilder() + .setTopic(testName.getMethodName()).setOffset(10L).setPartition(1).build(); + + assertThat(partitionOffset1.equals(partitionOffset2), is(false)); + assertThat(partitionOffset1.compareTo(partitionOffset2), is(lessThan(0))); + } + + @Test + public void partitionOffsetNotEqualDiffOffset() { + Offsets.PartitionOffset partitionOffset1 = Offsets.PartitionOffset.Builder.newBuilder() + .setTopic(testName.getMethodName()).setOffset(9L).setPartition(1).build(); + + Offsets.PartitionOffset partitionOffset2 = Offsets.PartitionOffset.Builder.newBuilder() + .setTopic(testName.getMethodName()).setOffset(10L).setPartition(1).build(); + + assertThat(partitionOffset1.equals(partitionOffset2), is(false)); + assertThat(partitionOffset1.compareTo(partitionOffset2), is(lessThan(0))); + } + + + @Test + public void partitionOffsetNotEqualDiffGreaterTopic() { + Offsets.PartitionOffset partitionOffset1 = Offsets.PartitionOffset.Builder.newBuilder() + .setTopic(testName.getMethodName()).setOffset(10L).setPartition(1).build(); + + Offsets.PartitionOffset partitionOffset2 = Offsets.PartitionOffset.Builder.newBuilder() + .setTopic("abc").setOffset(10L).setPartition(1).build(); + + assertThat(partitionOffset1.equals(partitionOffset2), is(false)); + assertThat(partitionOffset1.compareTo(partitionOffset2), is(greaterThan(0))); + } + + @Test + public void partitionOffsetNotEqualDiffGreaterPartition() { + Offsets.PartitionOffset partitionOffset1 = Offsets.PartitionOffset.Builder.newBuilder() + .setTopic(testName.getMethodName()).setOffset(10L).setPartition(2).build(); + + Offsets.PartitionOffset partitionOffset2 = Offsets.PartitionOffset.Builder.newBuilder() + .setTopic(testName.getMethodName()).setOffset(10L).setPartition(1).build(); + + assertThat(partitionOffset1.equals(partitionOffset2), is(false)); + assertThat(partitionOffset1.compareTo(partitionOffset2), is(greaterThan(0))); + } + + @Test + public void partitionOffsetNotEqualDiffGreaterOffset() { + Offsets.PartitionOffset partitionOffset1 = Offsets.PartitionOffset.Builder.newBuilder() + .setTopic(testName.getMethodName()).setOffset(12L).setPartition(1).build(); + + Offsets.PartitionOffset partitionOffset2 = Offsets.PartitionOffset.Builder.newBuilder() + .setTopic(testName.getMethodName()).setOffset(10L).setPartition(1).build(); + + assertThat(partitionOffset1.equals(partitionOffset2), is(false)); + assertThat(partitionOffset1.compareTo(partitionOffset2), is(greaterThan(0))); + } + + @Test + public void jsonSerializationPartitionOffset() throws IOException { + Offsets.PartitionOffset partitionOffset = Offsets.PartitionOffset.Builder.newBuilder() + .setTopic(testName.getMethodName()).setOffset(12L).setPartition(1).build(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + mapper.writeValue(baos, partitionOffset); + + Offsets.PartitionOffset readOffset = mapper.readValue(baos.toByteArray(), Offsets.PartitionOffset.class); + + assertThat(readOffset, is(partitionOffset)); + } + + + @Test + public void buildOffsetsNoOffsets() { + Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10).build(); + assertThat(offsets.getAsOfTime(), is(10L)); + assertThat(offsets.getOffsets(), is(Collections.<Offsets.PartitionOffset>emptyList())); + } + + @Test + public void buildOffsetsSortOffsets() { + int partition = 0; + long offset = 10L; + + List<Offsets.PartitionOffset> reversedOffsets = new LinkedList<>(); + for(int i = 0; i < 9; i++){ + reversedOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic" + (9 - i)) + .setPartition(partition).setOffset(offset).build()); + } + + Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(reversedOffsets).build(); + assertThat(offsets.getAsOfTime(), is(10L)); + + List<Offsets.PartitionOffset> returnedOffsets = offsets.getOffsets(); + int count = 1; + + //iterate in the expected order + for (Offsets.PartitionOffset o : returnedOffsets) { + assertThat(o.getTopic(), is("topic" + count)); + assertThat(o.getPartition(), is(partition)); + assertThat(o.getOffset(), is(offset)); + count++; + } + assertThat(count, is(10)); + } + + @Test + public void offsetsSame() { + int partition = 0; + long offset = 10L; + + List<Offsets.PartitionOffset> reversedOffsets = new LinkedList<>(); + for(int i = 0; i < 9; i++){ + reversedOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic" + (9 - i)) + .setPartition(partition).setOffset(offset).build()); + } + + Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(reversedOffsets).build(); + assertThat(offsets.getAsOfTime(), is(10L)); + + assertThat(offsets.equals(offsets), is(true)); + } + + @Test + public void offsetsEqual() { + int partition = 0; + long offset = 10L; + + List<Offsets.PartitionOffset> reversedOffsets = new LinkedList<>(); + for(int i = 0; i < 9; i++){ + reversedOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic" + (9 - i)) + .setPartition(partition).setOffset(offset).build()); + } + + + Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(reversedOffsets).build(); + assertThat(offsets.getAsOfTime(), is(10L)); + + Offsets offsets2 = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(reversedOffsets).build(); + assertThat(offsets.getAsOfTime(), is(10L)); + + assertThat(offsets.equals(offsets2), is(true)); + } + + @Test + public void offsetsDiffAsOfTime() { + int partition = 0; + long offset = 10L; + + List<Offsets.PartitionOffset> reversedOffsets = new LinkedList<>(); + for(int i = 0; i < 9; i++){ + reversedOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic" + (9 - i)) + .setPartition(partition).setOffset(offset).build()); + } + + Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(reversedOffsets).build(); + assertThat(offsets.getAsOfTime(), is(10L)); + + Offsets offsets2 = Offsets.Builder.newBuilder().setAsOfTime(11).setOffsets(reversedOffsets).build(); + assertThat(offsets.getAsOfTime(), is(10L)); + + assertThat(offsets.equals(offsets2), is(false)); + } + + @Test + public void offsetsDiffOffsets() { + int partition = 0; + long offset = 10L; + + List<Offsets.PartitionOffset> reversedOffsets = new LinkedList<>(); + for(int i = 0; i < 9; i++){ + reversedOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic" + (9 - i)) + .setPartition(partition).setOffset(offset).build()); + } + + List<Offsets.PartitionOffset> secondOffsets = new LinkedList<>(); + for(int i = 0; i < 5; i++){ + secondOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic" + (9 - i)) + .setPartition(partition).setOffset(offset).build()); + } + + + Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(reversedOffsets).build(); + assertThat(offsets.getAsOfTime(), is(10L)); + + Offsets offsets2 = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(secondOffsets).build(); + assertThat(offsets.getAsOfTime(), is(10L)); + + assertThat(offsets.equals(offsets2), is(false)); + } + + @Test(expected = IllegalStateException.class) + public void offsetsDuplicates() { + int partition = 0; + long offset = 10L; + + List<Offsets.PartitionOffset> reversedOffsets = new LinkedList<>(); + for(int i = 0; i < 9; i++){ + reversedOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic" + (9 - i)) + .setPartition(partition).setOffset(offset).build()); + } + + reversedOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic9").setPartition(0).build()); + + Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(reversedOffsets).build(); + } + + @Test + public void offsetsDiffListInstances() { + int partition = 0; + long offset = 10L; + + List<Offsets.PartitionOffset> reversedOffsets = new LinkedList<>(); + for(int i = 0; i < 9; i++){ + reversedOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic" + (9 - i)) + .setPartition(partition).setOffset(offset).build()); + } + + List<Offsets.PartitionOffset> secondOffsets = new LinkedList<>(); + for(int i = 0; i < 9; i++){ + secondOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic" + (9 - i)) + .setPartition(partition).setOffset(offset).build()); + } + + + Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(reversedOffsets).build(); + assertThat(offsets.getAsOfTime(), is(10L)); + + Offsets offsets2 = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(secondOffsets).build(); + assertThat(offsets.getAsOfTime(), is(10L)); + + assertThat(offsets.equals(offsets2), is(true)); + } + + @Test + public void offsetsEqualEmptyOffsets() { + int partition = 0; + long offset = 10L; + + List<Offsets.PartitionOffset> reversedOffsets = new LinkedList<>(); + + List<Offsets.PartitionOffset> secondOffsets = new LinkedList<>(); + + + Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(reversedOffsets).build(); + assertThat(offsets.getAsOfTime(), is(10L)); + + Offsets offsets2 = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(secondOffsets).build(); + assertThat(offsets.getAsOfTime(), is(10L)); + + assertThat(offsets.equals(offsets2), is(true)); + } + + @Test + public void jsonSerializationOffsets() throws IOException { + int partition = 0; + long offset = 10L; + + List<Offsets.PartitionOffset> partitionOffsets = new LinkedList<>(); + for(int i = 0; i < 100; i++){ + partitionOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic" + i) + .setPartition(partition).setOffset(offset).build()); + } + + + Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(partitionOffsets).build(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + mapper.writeValue(baos, offsets); + + Offsets readOffsets = mapper.readValue(baos.toByteArray(), Offsets.class); + + assertThat(readOffsets, is(offsets)); + } + + @Test + public void jsonSerializationOffsetsEmpty() throws IOException { + int partition = 0; + long offset = 10L; + + + Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10) + .setOffsets(Collections.<Offsets.PartitionOffset>emptyList()).build(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + mapper.writeValue(baos, offsets); + + Offsets readOffsets = mapper.readValue(baos.toByteArray(), Offsets.class); + + assertThat(readOffsets, is(offsets)); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 78ea085..47e58ba 100644 --- a/pom.xml +++ b/pom.xml @@ -90,6 +90,7 @@ under the License. <parquet.version>1.8.1</parquet.version> <javassist.version>3.16.1-GA</javassist.version> <jackson.version>1.8.8</jackson.version> + <jackson.databind.version>2.6.1</jackson.databind.version> <protobuf-java.version>2.5.0</protobuf-java.version> <libthrift.version>0.8.0</libthrift.version> <slf4j.version>1.6.1</slf4j.version> @@ -298,6 +299,24 @@ under the License. </dependency> <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.databind.version}</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>${jackson.databind.version}</version> + </dependency> + + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + <version>2.8.1</version> + </dependency> + + <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>${protobuf-java.version}</version>
