http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java index 4b2190f..49e806d 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java @@ -25,10 +25,8 @@ import static org.mockito.Matchers.anyListOf; import static org.mockito.Mockito.when; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; - import java.io.IOException; import java.util.Collections; - import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -42,114 +40,112 @@ import org.mockito.stubbing.Answer; */ @RunWith(MockitoJUnitRunner.class) public class ShardRecordsIteratorTest { + private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR"; + private static final String SECOND_ITERATOR = "SECOND_ITERATOR"; + private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR"; + private static final String THIRD_ITERATOR = "THIRD_ITERATOR"; + private static final String STREAM_NAME = "STREAM_NAME"; + private static final String SHARD_ID = "SHARD_ID"; + + @Mock + private SimplifiedKinesisClient kinesisClient; + @Mock + private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, cCheckpoint, dCheckpoint; + @Mock + private GetKinesisRecordsResult firstResult, secondResult, thirdResult; + @Mock + private KinesisRecord a, b, c, d; + @Mock + private RecordFilter recordFilter; + + private ShardRecordsIterator iterator; + + @Before + public void setUp() throws IOException, TransientKinesisException { + when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR); + when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME); + when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID); + + when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint); + when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint); + when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME); + when(aCheckpoint.getShardId()).thenReturn(SHARD_ID); + when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint); + when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME); + when(bCheckpoint.getShardId()).thenReturn(SHARD_ID); + when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint); + when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME); + when(cCheckpoint.getShardId()).thenReturn(SHARD_ID); + when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME); + when(dCheckpoint.getShardId()).thenReturn(SHARD_ID); + + when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID)) + .thenReturn(firstResult); + when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID)) + .thenReturn(secondResult); + when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID)) + .thenReturn(thirdResult); + + when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR); + when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR); + when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR); + + when(firstResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList()); + when(secondResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList()); + when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList()); + + when(recordFilter.apply(anyListOf(KinesisRecord.class), any(ShardCheckpoint + .class))).thenAnswer(new IdentityAnswer()); + + iterator = new ShardRecordsIterator(firstCheckpoint, kinesisClient, recordFilter); + } + + @Test + public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException { + assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); + assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); + assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); + } + + @Test + public void goesThroughAvailableRecords() throws IOException, TransientKinesisException { + when(firstResult.getRecords()).thenReturn(asList(a, b, c)); + when(secondResult.getRecords()).thenReturn(singletonList(d)); + + assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint); + assertThat(iterator.next()).isEqualTo(CustomOptional.of(a)); + assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint); + assertThat(iterator.next()).isEqualTo(CustomOptional.of(b)); + assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint); + assertThat(iterator.next()).isEqualTo(CustomOptional.of(c)); + assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint); + assertThat(iterator.next()).isEqualTo(CustomOptional.of(d)); + assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint); + assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); + assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint); + } + + @Test + public void refreshesExpiredIterator() throws IOException, TransientKinesisException { + when(firstResult.getRecords()).thenReturn(singletonList(a)); + when(secondResult.getRecords()).thenReturn(singletonList(b)); + + when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID)) + .thenThrow(ExpiredIteratorException.class); + when(aCheckpoint.getShardIterator(kinesisClient)) + .thenReturn(SECOND_REFRESHED_ITERATOR); + when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID)) + .thenReturn(secondResult); + + assertThat(iterator.next()).isEqualTo(CustomOptional.of(a)); + assertThat(iterator.next()).isEqualTo(CustomOptional.of(b)); + assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); + } - private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR"; - private static final String SECOND_ITERATOR = "SECOND_ITERATOR"; - private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR"; - private static final String THIRD_ITERATOR = "THIRD_ITERATOR"; - private static final String STREAM_NAME = "STREAM_NAME"; - private static final String SHARD_ID = "SHARD_ID"; - - @Mock - private SimplifiedKinesisClient kinesisClient; - @Mock - private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, cCheckpoint, dCheckpoint; - @Mock - private GetKinesisRecordsResult firstResult, secondResult, thirdResult; - @Mock - private KinesisRecord a, b, c, d; - @Mock - private RecordFilter recordFilter; - - private ShardRecordsIterator iterator; - - @Before - public void setUp() throws IOException, TransientKinesisException { - when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR); - when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME); - when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID); - - when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint); - when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint); - when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME); - when(aCheckpoint.getShardId()).thenReturn(SHARD_ID); - when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint); - when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME); - when(bCheckpoint.getShardId()).thenReturn(SHARD_ID); - when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint); - when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME); - when(cCheckpoint.getShardId()).thenReturn(SHARD_ID); - when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME); - when(dCheckpoint.getShardId()).thenReturn(SHARD_ID); - - when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID)) - .thenReturn(firstResult); - when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID)) - .thenReturn(secondResult); - when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID)) - .thenReturn(thirdResult); - - when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR); - when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR); - when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR); - - when(firstResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList()); - when(secondResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList()); - when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList()); - - when(recordFilter.apply(anyListOf(KinesisRecord.class), any(ShardCheckpoint - .class))).thenAnswer(new IdentityAnswer()); - - iterator = new ShardRecordsIterator(firstCheckpoint, kinesisClient, recordFilter); - } - - @Test - public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException { - assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); - assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); - assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); - } - - @Test - public void goesThroughAvailableRecords() throws IOException, TransientKinesisException { - when(firstResult.getRecords()).thenReturn(asList(a, b, c)); - when(secondResult.getRecords()).thenReturn(singletonList(d)); - - assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint); - assertThat(iterator.next()).isEqualTo(CustomOptional.of(a)); - assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint); - assertThat(iterator.next()).isEqualTo(CustomOptional.of(b)); - assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint); - assertThat(iterator.next()).isEqualTo(CustomOptional.of(c)); - assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint); - assertThat(iterator.next()).isEqualTo(CustomOptional.of(d)); - assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint); - assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); - assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint); - } - - @Test - public void refreshesExpiredIterator() throws IOException, TransientKinesisException { - when(firstResult.getRecords()).thenReturn(singletonList(a)); - when(secondResult.getRecords()).thenReturn(singletonList(b)); - - when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID)) - .thenThrow(ExpiredIteratorException.class); - when(aCheckpoint.getShardIterator(kinesisClient)) - .thenReturn(SECOND_REFRESHED_ITERATOR); - when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID)) - .thenReturn(secondResult); - - assertThat(iterator.next()).isEqualTo(CustomOptional.of(a)); - assertThat(iterator.next()).isEqualTo(CustomOptional.of(b)); - assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); - } - - private static class IdentityAnswer implements Answer<Object> { - - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - return invocation.getArguments()[0]; + private static class IdentityAnswer implements Answer<Object> { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + return invocation.getArguments()[0]; + } } - } }
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java index 2f8757c..96434fd 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java @@ -34,9 +34,7 @@ import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededExcepti import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.model.StreamDescription; - import java.util.List; - import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; @@ -48,180 +46,179 @@ import org.mockito.runners.MockitoJUnitRunner; */ @RunWith(MockitoJUnitRunner.class) public class SimplifiedKinesisClientTest { + private static final String STREAM = "stream"; + private static final String SHARD_1 = "shard-01"; + private static final String SHARD_2 = "shard-02"; + private static final String SHARD_3 = "shard-03"; + private static final String SHARD_ITERATOR = "iterator"; + private static final String SEQUENCE_NUMBER = "abc123"; + + @Mock + private AmazonKinesis kinesis; + @InjectMocks + private SimplifiedKinesisClient underTest; + + @Test + public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception { + given(kinesis.getShardIterator(new GetShardIteratorRequest() + .withStreamName(STREAM) + .withShardId(SHARD_1) + .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) + .withStartingSequenceNumber(SEQUENCE_NUMBER) + )).willReturn(new GetShardIteratorResult() + .withShardIterator(SHARD_ITERATOR)); + + String stream = underTest.getShardIterator(STREAM, SHARD_1, + ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null); + + assertThat(stream).isEqualTo(SHARD_ITERATOR); + } + + @Test + public void shouldReturnIteratorStartingWithTimestamp() throws Exception { + Instant timestamp = Instant.now(); + given(kinesis.getShardIterator(new GetShardIteratorRequest() + .withStreamName(STREAM) + .withShardId(SHARD_1) + .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) + .withTimestamp(timestamp.toDate()) + )).willReturn(new GetShardIteratorResult() + .withShardIterator(SHARD_ITERATOR)); + + String stream = underTest.getShardIterator(STREAM, SHARD_1, + ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp); + + assertThat(stream).isEqualTo(SHARD_ITERATOR); + } + + @Test + public void shouldHandleExpiredIterationExceptionForGetShardIterator() { + shouldHandleGetShardIteratorError(new ExpiredIteratorException(""), + ExpiredIteratorException.class); + } + + @Test + public void shouldHandleLimitExceededExceptionForGetShardIterator() { + shouldHandleGetShardIteratorError(new LimitExceededException(""), + TransientKinesisException.class); + } + + @Test + public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() { + shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""), + TransientKinesisException.class); + } + + @Test + public void shouldHandleServiceErrorForGetShardIterator() { + shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Service), + TransientKinesisException.class); + } + + @Test + public void shouldHandleClientErrorForGetShardIterator() { + shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Client), + RuntimeException.class); + } + + @Test + public void shouldHandleUnexpectedExceptionForGetShardIterator() { + shouldHandleGetShardIteratorError(new NullPointerException(), + RuntimeException.class); + } + + private void shouldHandleGetShardIteratorError( + Exception thrownException, + Class<? extends Exception> expectedExceptionClass) { + GetShardIteratorRequest request = new GetShardIteratorRequest() + .withStreamName(STREAM) + .withShardId(SHARD_1) + .withShardIteratorType(ShardIteratorType.LATEST); + + given(kinesis.getShardIterator(request)).willThrow(thrownException); + + try { + underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null); + failBecauseExceptionWasNotThrown(expectedExceptionClass); + } catch (Exception e) { + assertThat(e).isExactlyInstanceOf(expectedExceptionClass); + } finally { + reset(kinesis); + } + } + + @Test + public void shouldListAllShards() throws Exception { + Shard shard1 = new Shard().withShardId(SHARD_1); + Shard shard2 = new Shard().withShardId(SHARD_2); + Shard shard3 = new Shard().withShardId(SHARD_3); + given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult() + .withStreamDescription(new StreamDescription() + .withShards(shard1, shard2) + .withHasMoreShards(true))); + given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult() + .withStreamDescription(new StreamDescription() + .withShards(shard3) + .withHasMoreShards(false))); + + List<Shard> shards = underTest.listShards(STREAM); + + assertThat(shards).containsOnly(shard1, shard2, shard3); + } + + @Test + public void shouldHandleExpiredIterationExceptionForShardListing() { + shouldHandleShardListingError(new ExpiredIteratorException(""), + ExpiredIteratorException.class); + } + + @Test + public void shouldHandleLimitExceededExceptionForShardListing() { + shouldHandleShardListingError(new LimitExceededException(""), + TransientKinesisException.class); + } + + @Test + public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() { + shouldHandleShardListingError(new ProvisionedThroughputExceededException(""), + TransientKinesisException.class); + } - private static final String STREAM = "stream"; - private static final String SHARD_1 = "shard-01"; - private static final String SHARD_2 = "shard-02"; - private static final String SHARD_3 = "shard-03"; - private static final String SHARD_ITERATOR = "iterator"; - private static final String SEQUENCE_NUMBER = "abc123"; - - @Mock - private AmazonKinesis kinesis; - @InjectMocks - private SimplifiedKinesisClient underTest; - - @Test - public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception { - given(kinesis.getShardIterator(new GetShardIteratorRequest() - .withStreamName(STREAM) - .withShardId(SHARD_1) - .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) - .withStartingSequenceNumber(SEQUENCE_NUMBER) - )).willReturn(new GetShardIteratorResult() - .withShardIterator(SHARD_ITERATOR)); - - String stream = underTest.getShardIterator(STREAM, SHARD_1, - ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null); - - assertThat(stream).isEqualTo(SHARD_ITERATOR); - } - - @Test - public void shouldReturnIteratorStartingWithTimestamp() throws Exception { - Instant timestamp = Instant.now(); - given(kinesis.getShardIterator(new GetShardIteratorRequest() - .withStreamName(STREAM) - .withShardId(SHARD_1) - .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) - .withTimestamp(timestamp.toDate()) - )).willReturn(new GetShardIteratorResult() - .withShardIterator(SHARD_ITERATOR)); - - String stream = underTest.getShardIterator(STREAM, SHARD_1, - ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp); - - assertThat(stream).isEqualTo(SHARD_ITERATOR); - } - - @Test - public void shouldHandleExpiredIterationExceptionForGetShardIterator() { - shouldHandleGetShardIteratorError(new ExpiredIteratorException(""), - ExpiredIteratorException.class); - } - - @Test - public void shouldHandleLimitExceededExceptionForGetShardIterator() { - shouldHandleGetShardIteratorError(new LimitExceededException(""), - TransientKinesisException.class); - } - - @Test - public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() { - shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""), - TransientKinesisException.class); - } - - @Test - public void shouldHandleServiceErrorForGetShardIterator() { - shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Service), - TransientKinesisException.class); - } - - @Test - public void shouldHandleClientErrorForGetShardIterator() { - shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Client), - RuntimeException.class); - } - - @Test - public void shouldHandleUnexpectedExceptionForGetShardIterator() { - shouldHandleGetShardIteratorError(new NullPointerException(), - RuntimeException.class); - } - - private void shouldHandleGetShardIteratorError( - Exception thrownException, - Class<? extends Exception> expectedExceptionClass) { - GetShardIteratorRequest request = new GetShardIteratorRequest() - .withStreamName(STREAM) - .withShardId(SHARD_1) - .withShardIteratorType(ShardIteratorType.LATEST); - - given(kinesis.getShardIterator(request)).willThrow(thrownException); - - try { - underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null); - failBecauseExceptionWasNotThrown(expectedExceptionClass); - } catch (Exception e) { - assertThat(e).isExactlyInstanceOf(expectedExceptionClass); - } finally { - reset(kinesis); - } - } - - @Test - public void shouldListAllShards() throws Exception { - Shard shard1 = new Shard().withShardId(SHARD_1); - Shard shard2 = new Shard().withShardId(SHARD_2); - Shard shard3 = new Shard().withShardId(SHARD_3); - given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult() - .withStreamDescription(new StreamDescription() - .withShards(shard1, shard2) - .withHasMoreShards(true))); - given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult() - .withStreamDescription(new StreamDescription() - .withShards(shard3) - .withHasMoreShards(false))); - - List<Shard> shards = underTest.listShards(STREAM); - - assertThat(shards).containsOnly(shard1, shard2, shard3); - } - - @Test - public void shouldHandleExpiredIterationExceptionForShardListing() { - shouldHandleShardListingError(new ExpiredIteratorException(""), - ExpiredIteratorException.class); - } - - @Test - public void shouldHandleLimitExceededExceptionForShardListing() { - shouldHandleShardListingError(new LimitExceededException(""), - TransientKinesisException.class); - } - - @Test - public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() { - shouldHandleShardListingError(new ProvisionedThroughputExceededException(""), - TransientKinesisException.class); - } - - @Test - public void shouldHandleServiceErrorForShardListing() { - shouldHandleShardListingError(newAmazonServiceException(ErrorType.Service), - TransientKinesisException.class); - } - - @Test - public void shouldHandleClientErrorForShardListing() { - shouldHandleShardListingError(newAmazonServiceException(ErrorType.Client), - RuntimeException.class); - } - - @Test - public void shouldHandleUnexpectedExceptionForShardListing() { - shouldHandleShardListingError(new NullPointerException(), - RuntimeException.class); - } - - private void shouldHandleShardListingError( - Exception thrownException, - Class<? extends Exception> expectedExceptionClass) { - given(kinesis.describeStream(STREAM, null)).willThrow(thrownException); - try { - underTest.listShards(STREAM); - failBecauseExceptionWasNotThrown(expectedExceptionClass); - } catch (Exception e) { - assertThat(e).isExactlyInstanceOf(expectedExceptionClass); - } finally { - reset(kinesis); - } - } - - private AmazonServiceException newAmazonServiceException(ErrorType errorType) { - AmazonServiceException exception = new AmazonServiceException(""); - exception.setErrorType(errorType); - return exception; - } + @Test + public void shouldHandleServiceErrorForShardListing() { + shouldHandleShardListingError(newAmazonServiceException(ErrorType.Service), + TransientKinesisException.class); + } + + @Test + public void shouldHandleClientErrorForShardListing() { + shouldHandleShardListingError(newAmazonServiceException(ErrorType.Client), + RuntimeException.class); + } + + @Test + public void shouldHandleUnexpectedExceptionForShardListing() { + shouldHandleShardListingError(new NullPointerException(), + RuntimeException.class); + } + + private void shouldHandleShardListingError( + Exception thrownException, + Class<? extends Exception> expectedExceptionClass) { + given(kinesis.describeStream(STREAM, null)).willThrow(thrownException); + try { + underTest.listShards(STREAM); + failBecauseExceptionWasNotThrown(expectedExceptionClass); + } catch (Exception e) { + assertThat(e).isExactlyInstanceOf(expectedExceptionClass); + } finally { + reset(kinesis); + } + } + + private AmazonServiceException newAmazonServiceException(ErrorType errorType) { + AmazonServiceException exception = new AmazonServiceException(""); + exception.setErrorType(errorType); + return exception; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/mongodb/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/pom.xml b/sdks/java/io/mongodb/pom.xml index d93cc41..912e20c 100644 --- a/sdks/java/io/mongodb/pom.xml +++ b/sdks/java/io/mongodb/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index 5b5412c..b63775d 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -117,7 +117,7 @@ import org.joda.time.Instant; * to the file separated with line feeds. * </p> */ -@Experimental(Experimental.Kind.SOURCE_SINK) +@Experimental public class MongoDbGridFSIO { /** http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java index 3b14182..620df74 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java @@ -18,13 +18,12 @@ package org.apache.beam.sdk.io.mongodb; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.mongodb.BasicDBObject; import com.mongodb.MongoClient; -import com.mongodb.MongoClientOptions; import com.mongodb.MongoClientURI; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; @@ -94,27 +93,19 @@ import org.slf4j.LoggerFactory; * * }</pre> */ -@Experimental(Experimental.Kind.SOURCE_SINK) +@Experimental public class MongoDbIO { private static final Logger LOG = LoggerFactory.getLogger(MongoDbIO.class); /** Read data from MongoDB. */ public static Read read() { - return new AutoValue_MongoDbIO_Read.Builder() - .setKeepAlive(true) - .setMaxConnectionIdleTime(60000) - .setNumSplits(0) - .build(); + return new AutoValue_MongoDbIO_Read.Builder().setNumSplits(0).build(); } /** Write data to MongoDB. */ public static Write write() { - return new AutoValue_MongoDbIO_Write.Builder() - .setKeepAlive(true) - .setMaxConnectionIdleTime(60000) - .setBatchSize(1024L) - .build(); + return new AutoValue_MongoDbIO_Write.Builder().setBatchSize(1024L).build(); } private MongoDbIO() { @@ -126,20 +117,16 @@ public class MongoDbIO { @AutoValue public abstract static class Read extends PTransform<PBegin, PCollection<Document>> { @Nullable abstract String uri(); - abstract boolean keepAlive(); - abstract int maxConnectionIdleTime(); @Nullable abstract String database(); @Nullable abstract String collection(); @Nullable abstract String filter(); abstract int numSplits(); - abstract Builder builder(); + abstract Builder toBuilder(); @AutoValue.Builder abstract static class Builder { abstract Builder setUri(String uri); - abstract Builder setKeepAlive(boolean keepAlive); - abstract Builder setMaxConnectionIdleTime(int maxConnectionIdleTime); abstract Builder setDatabase(String database); abstract Builder setCollection(String collection); abstract Builder setFilter(String filter); @@ -148,94 +135,31 @@ public class MongoDbIO { } /** - * Define the location of the MongoDB instances using an URI. The URI describes the hosts to - * be used and some options. - * - * <p>The format of the URI is: - * - * <pre>{@code - * mongodb://[username:password@]host1[:port1]...[,hostN[:portN]]][/[database][?options]] - * }</pre> - * - * <p>Where: - * <ul> - * <li>{@code mongodb://} is a required prefix to identify that this is a string in the - * standard connection format.</li> - * <li>{@code username:password@} are optional. If given, the driver will attempt to - * login to a database after connecting to a database server. For some authentication - * mechanisms, only the username is specified and the password is not, in which case - * the ":" after the username is left off as well.</li> - * <li>{@code host1} is the only required part of the URI. It identifies a server - * address to connect to.</li> - * <li>{@code :portX} is optional and defaults to {@code :27017} if not provided.</li> - * <li>{@code /database} is the name of the database to login to and thus is only - * relevant if the {@code username:password@} syntax is used. If not specified, the - * "admin" database will be used by default. It has to be equivalent with the database - * you specific with {@link Read#withDatabase(String)}.</li> - * <li>{@code ?options} are connection options. Note that if {@code database} is absent - * there is still a {@code /} required between the last {@code host} and the {@code ?} - * introducing the options. Options are name=value pairs and the pairs are separated by - * "{@code &}". The {@code KeepAlive} connection option can't be passed via the URI, - * instead you have to use {@link Read#withKeepAlive(boolean)}. Same for the - * {@code MaxConnectionIdleTime} connection option via - * {@link Read#withMaxConnectionIdleTime(int)}. - * </li> - * </ul> + * Example documentation for withUri. */ public Read withUri(String uri) { - checkArgument(uri != null, "MongoDbIO.read().withUri(uri) called with null uri"); - return builder().setUri(uri).build(); - } - - /** - * Sets whether socket keep alive is enabled. - */ - public Read withKeepAlive(boolean keepAlive) { - return builder().setKeepAlive(keepAlive).build(); - } - - /** - * Sets the maximum idle time for a pooled connection. - */ - public Read withMaxConnectionIdleTime(int maxConnectionIdleTime) { - return builder().setMaxConnectionIdleTime(maxConnectionIdleTime).build(); + checkNotNull(uri); + return toBuilder().setUri(uri).build(); } - /** - * Sets the database to use. - */ public Read withDatabase(String database) { - checkArgument(database != null, "MongoDbIO.read().withDatabase(database) called with null" - + " database"); - return builder().setDatabase(database).build(); + checkNotNull(database); + return toBuilder().setDatabase(database).build(); } - /** - * Sets the collection to consider in the database. - */ public Read withCollection(String collection) { - checkArgument(collection != null, "MongoDbIO.read().withCollection(collection) called " - + "with null collection"); - return builder().setCollection(collection).build(); + checkNotNull(collection); + return toBuilder().setCollection(collection).build(); } - /** - * Sets a filter on the documents in a collection. - */ public Read withFilter(String filter) { - checkArgument(filter != null, "MongoDbIO.read().withFilter(filter) called with null " - + "filter"); - return builder().setFilter(filter).build(); + checkNotNull(filter); + return toBuilder().setFilter(filter).build(); } - /** - * Sets the user defined number of splits. - */ public Read withNumSplits(int numSplits) { - checkArgument(numSplits >= 0, "MongoDbIO.read().withNumSplits(numSplits) called with " - + "invalid number. The number of splits has to be a positive value (currently %d)", - numSplits); - return builder().setNumSplits(numSplits).build(); + checkArgument(numSplits >= 0); + return toBuilder().setNumSplits(numSplits).build(); } @Override @@ -245,19 +169,15 @@ public class MongoDbIO { @Override public void validate(PipelineOptions options) { - checkState(uri() != null, "MongoDbIO.read() requires an URI to be set via withUri(uri)"); - checkState(database() != null, "MongoDbIO.read() requires a database to be set via " - + "withDatabase(database)"); - checkState(collection() != null, "MongoDbIO.read() requires a collection to be set via " - + "withCollection(collection)"); + checkNotNull(uri(), "uri"); + checkNotNull(database(), "database"); + checkNotNull(collection(), "collection"); } @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder.add(DisplayData.item("uri", uri())); - builder.add(DisplayData.item("keepAlive", keepAlive())); - builder.add(DisplayData.item("maxConnectionIdleTime", maxConnectionIdleTime())); builder.add(DisplayData.item("database", database())); builder.add(DisplayData.item("collection", collection())); builder.addIfNotNull(DisplayData.item("filter", filter())); @@ -298,71 +218,61 @@ public class MongoDbIO { @Override public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) { - try (MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()))) { - return getEstimatedSizeBytes(mongoClient, spec.database(), spec.collection()); - } - } - - private long getEstimatedSizeBytes(MongoClient mongoClient, - String database, - String collection) { - MongoDatabase mongoDatabase = mongoClient.getDatabase(database); + MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri())); + MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database()); // get the Mongo collStats object // it gives the size for the entire collection BasicDBObject stat = new BasicDBObject(); - stat.append("collStats", collection); + stat.append("collStats", spec.collection()); Document stats = mongoDatabase.runCommand(stat); - return stats.get("size", Number.class).longValue(); } @Override public List<BoundedSource<Document>> split(long desiredBundleSizeBytes, PipelineOptions options) { - try (MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()))) { - MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database()); - - List<Document> splitKeys; - if (spec.numSplits() > 0) { - // the user defines his desired number of splits - // calculate the batch size - long estimatedSizeBytes = getEstimatedSizeBytes(mongoClient, - spec.database(), spec.collection()); - desiredBundleSizeBytes = estimatedSizeBytes / spec.numSplits(); - } - - // the desired batch size is small, using default chunk size of 1MB - if (desiredBundleSizeBytes < 1024 * 1024) { - desiredBundleSizeBytes = 1 * 1024 * 1024; - } - - // now we have the batch size (provided by user or provided by the runner) - // we use Mongo splitVector command to get the split keys - BasicDBObject splitVectorCommand = new BasicDBObject(); - splitVectorCommand.append("splitVector", spec.database() + "." + spec.collection()); - splitVectorCommand.append("keyPattern", new BasicDBObject().append("_id", 1)); - splitVectorCommand.append("force", false); - // maxChunkSize is the Mongo partition size in MB - LOG.debug("Splitting in chunk of {} MB", desiredBundleSizeBytes / 1024 / 1024); - splitVectorCommand.append("maxChunkSize", desiredBundleSizeBytes / 1024 / 1024); - Document splitVectorCommandResult = mongoDatabase.runCommand(splitVectorCommand); - splitKeys = (List<Document>) splitVectorCommandResult.get("splitKeys"); - - List<BoundedSource<Document>> sources = new ArrayList<>(); - if (splitKeys.size() < 1) { - LOG.debug("Split keys is low, using an unique source"); - sources.add(this); - return sources; - } + MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri())); + MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database()); + + List<Document> splitKeys; + if (spec.numSplits() > 0) { + // the user defines his desired number of splits + // calculate the batch size + long estimatedSizeBytes = getEstimatedSizeBytes(options); + desiredBundleSizeBytes = estimatedSizeBytes / spec.numSplits(); + } - LOG.debug("Number of splits is {}", splitKeys.size()); - for (String shardFilter : splitKeysToFilters(splitKeys, spec.filter())) { - sources.add(new BoundedMongoDbSource(spec.withFilter(shardFilter))); - } + // the desired batch size is small, using default chunk size of 1MB + if (desiredBundleSizeBytes < 1024 * 1024) { + desiredBundleSizeBytes = 1 * 1024 * 1024; + } + // now we have the batch size (provided by user or provided by the runner) + // we use Mongo splitVector command to get the split keys + BasicDBObject splitVectorCommand = new BasicDBObject(); + splitVectorCommand.append("splitVector", spec.database() + "." + spec.collection()); + splitVectorCommand.append("keyPattern", new BasicDBObject().append("_id", 1)); + splitVectorCommand.append("force", false); + // maxChunkSize is the Mongo partition size in MB + LOG.debug("Splitting in chunk of {} MB", desiredBundleSizeBytes / 1024 / 1024); + splitVectorCommand.append("maxChunkSize", desiredBundleSizeBytes / 1024 / 1024); + Document splitVectorCommandResult = mongoDatabase.runCommand(splitVectorCommand); + splitKeys = (List<Document>) splitVectorCommandResult.get("splitKeys"); + + List<BoundedSource<Document>> sources = new ArrayList<>(); + if (splitKeys.size() < 1) { + LOG.debug("Split keys is low, using an unique source"); + sources.add(this); return sources; } + + LOG.debug("Number of splits is {}", splitKeys.size()); + for (String shardFilter : splitKeysToFilters(splitKeys, spec.filter())) { + sources.add(new BoundedMongoDbSource(spec.withFilter(shardFilter))); + } + + return sources; } /** @@ -457,10 +367,7 @@ public class MongoDbIO { @Override public boolean start() { Read spec = source.spec; - MongoClientOptions.Builder optionsBuilder = new MongoClientOptions.Builder(); - optionsBuilder.maxConnectionIdleTime(spec.maxConnectionIdleTime()); - optionsBuilder.socketKeepAlive(spec.keepAlive()); - client = new MongoClient(new MongoClientURI(spec.uri(), optionsBuilder)); + client = new MongoClient(new MongoClientURI(spec.uri())); MongoDatabase mongoDatabase = client.getDatabase(spec.database()); @@ -519,106 +426,36 @@ public class MongoDbIO { */ @AutoValue public abstract static class Write extends PTransform<PCollection<Document>, PDone> { - @Nullable abstract String uri(); - abstract boolean keepAlive(); - abstract int maxConnectionIdleTime(); @Nullable abstract String database(); @Nullable abstract String collection(); abstract long batchSize(); - abstract Builder builder(); + abstract Builder toBuilder(); @AutoValue.Builder abstract static class Builder { abstract Builder setUri(String uri); - abstract Builder setKeepAlive(boolean keepAlive); - abstract Builder setMaxConnectionIdleTime(int maxConnectionIdleTime); abstract Builder setDatabase(String database); abstract Builder setCollection(String collection); abstract Builder setBatchSize(long batchSize); abstract Write build(); } - /** - * Define the location of the MongoDB instances using an URI. The URI describes the hosts to - * be used and some options. - * - * <p>The format of the URI is: - * - * <pre>{@code - * mongodb://[username:password@]host1[:port1],...[,hostN[:portN]]][/[database][?options]] - * }</pre> - * - * <p>Where: - * <ul> - * <li>{@code mongodb://} is a required prefix to identify that this is a string in the - * standard connection format.</li> - * <li>{@code username:password@} are optional. If given, the driver will attempt to - * login to a database after connecting to a database server. For some authentication - * mechanisms, only the username is specified and the password is not, in which case - * the ":" after the username is left off as well.</li> - * <li>{@code host1} is the only required part of the URI. It identifies a server - * address to connect to.</li> - * <li>{@code :portX} is optional and defaults to {@code :27017} if not provided.</li> - * <li>{@code /database} is the name of the database to login to and thus is only - * relevant if the {@code username:password@} syntax is used. If not specified, the - * "admin" database will be used by default. It has to be equivalent with the database - * you specific with {@link Write#withDatabase(String)}.</li> - * <li>{@code ?options} are connection options. Note that if {@code database} is absent - * there is still a {@code /} required between the last {@code host} and the {@code ?} - * introducing the options. Options are name=value pairs and the pairs are separated by - * "{@code &}". The {@code KeepAlive} connection option can't be passed via the URI, instead - * you have to use {@link Write#withKeepAlive(boolean)}. Same for the - * {@code MaxConnectionIdleTime} connection option via - * {@link Write#withMaxConnectionIdleTime(int)}. - * </li> - * </ul> - */ public Write withUri(String uri) { - checkArgument(uri != null, "MongoDbIO.write().withUri(uri) called with null uri"); - return builder().setUri(uri).build(); - } - - /** - * Sets whether socket keep alive is enabled. - */ - public Write withKeepAlive(boolean keepAlive) { - return builder().setKeepAlive(keepAlive).build(); - } - - /** - * Sets the maximum idle time for a pooled connection. - */ - public Write withMaxConnectionIdleTime(int maxConnectionIdleTime) { - return builder().setMaxConnectionIdleTime(maxConnectionIdleTime).build(); + return toBuilder().setUri(uri).build(); } - /** - * Sets the database to use. - */ public Write withDatabase(String database) { - checkArgument(database != null, "MongoDbIO.write().withDatabase(database) called with " - + "null database"); - return builder().setDatabase(database).build(); + return toBuilder().setDatabase(database).build(); } - /** - * Sets the collection where to write data in the database. - */ public Write withCollection(String collection) { - checkArgument(collection != null, "MongoDbIO.write().withCollection(collection) called " - + "with null collection"); - return builder().setCollection(collection).build(); + return toBuilder().setCollection(collection).build(); } - /** - * Define the size of the batch to group write operations. - */ public Write withBatchSize(long batchSize) { - checkArgument(batchSize >= 0, "MongoDbIO.write().withBatchSize(batchSize) called with " - + "invalid batch size. Batch size has to be >= 0 (currently %d)", batchSize); - return builder().setBatchSize(batchSize).build(); + return toBuilder().setBatchSize(batchSize).build(); } @Override @@ -629,21 +466,10 @@ public class MongoDbIO { @Override public void validate(PipelineOptions options) { - checkState(uri() != null, "MongoDbIO.write() requires an URI to be set via withUri(uri)"); - checkState(database() != null, "MongoDbIO.write() requires a database to be set via " - + "withDatabase(database)"); - checkState(collection() != null, "MongoDbIO.write() requires a collection to be set via " - + "withCollection(collection)"); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.add(DisplayData.item("uri", uri())); - builder.add(DisplayData.item("keepAlive", keepAlive())); - builder.add(DisplayData.item("maxConnectionIdleTime", maxConnectionIdleTime())); - builder.add(DisplayData.item("database", database())); - builder.add(DisplayData.item("collection", collection())); - builder.add(DisplayData.item("batchSize", batchSize())); + checkNotNull(uri(), "uri"); + checkNotNull(database(), "database"); + checkNotNull(collection(), "collection"); + checkNotNull(batchSize(), "batchSize"); } private static class WriteFn extends DoFn<Document, Void> { @@ -657,10 +483,7 @@ public class MongoDbIO { @Setup public void createMongoClient() throws Exception { - MongoClientOptions.Builder builder = new MongoClientOptions.Builder(); - builder.socketKeepAlive(spec.keepAlive()); - builder.maxConnectionIdleTime(spec.maxConnectionIdleTime()); - client = new MongoClient(new MongoClientURI(spec.uri(), builder)); + client = new MongoClient(new MongoClientURI(spec.uri())); } @StartBundle http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java index 67dbca4..cd26b48 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.mongodb; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import com.mongodb.MongoClient; import com.mongodb.client.MongoCollection; @@ -190,42 +189,6 @@ public class MongoDbIOTest implements Serializable { } @Test - public void testReadWithCustomConnectionOptions() throws Exception { - MongoDbIO.Read read = MongoDbIO.read() - .withUri("mongodb://localhost:" + port) - .withKeepAlive(false) - .withMaxConnectionIdleTime(10) - .withDatabase(DATABASE) - .withCollection(COLLECTION); - assertFalse(read.keepAlive()); - assertEquals(10, read.maxConnectionIdleTime()); - - PCollection<Document> documents = pipeline.apply(read); - - PAssert.thatSingleton(documents.apply("Count All", Count.<Document>globally())) - .isEqualTo(1000L); - - PAssert.that(documents - .apply("Map Scientist", MapElements.via(new SimpleFunction<Document, KV<String, Void>>() { - public KV<String, Void> apply(Document input) { - return KV.of(input.getString("scientist"), null); - } - })) - .apply("Count Scientist", Count.<String, Void>perKey()) - ).satisfies(new SerializableFunction<Iterable<KV<String, Long>>, Void>() { - @Override - public Void apply(Iterable<KV<String, Long>> input) { - for (KV<String, Long> element : input) { - assertEquals(100L, element.getValue().longValue()); - } - return null; - } - }); - - pipeline.run(); - } - - @Test public void testReadWithFilter() throws Exception { PCollection<Document> output = pipeline.apply( http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/mqtt/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/mqtt/pom.xml b/sdks/java/io/mqtt/pom.xml index 9fa1dc0..baaf771 100644 --- a/sdks/java/io/mqtt/pom.xml +++ b/sdks/java/io/mqtt/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java index add5cb5..228a85d 100644 --- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java @@ -97,7 +97,7 @@ import org.slf4j.LoggerFactory; * * }</pre> */ -@Experimental(Experimental.Kind.SOURCE_SINK) +@Experimental public class MqttIO { private static final Logger LOG = LoggerFactory.getLogger(MqttIO.class); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml index b7909fa..44f3baa 100644 --- a/sdks/java/io/pom.xml +++ b/sdks/java/io/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -32,8 +32,38 @@ <description>Beam SDK Java IO provides different connectivity components (sources and sinks) to consume and produce data from systems.</description> + <properties> + <!-- + This is the version of Hadoop used to compile the hadoop-common module. + This dependency is defined with a provided scope. + Users must supply their own Hadoop version at runtime. + --> + <hadoop.version>2.7.3</hadoop.version> + </properties> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + <modules> - <module>amqp</module> <module>cassandra</module> <module>common</module> <module>elasticsearch</module> @@ -42,7 +72,6 @@ <module>hadoop-file-system</module> <module>hadoop</module> <module>hbase</module> - <module>hcatalog</module> <module>jdbc</module> <module>jms</module> <module>kafka</module> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/xml/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/pom.xml b/sdks/java/io/xml/pom.xml index 7b5804e..cf7dd33 100644 --- a/sdks/java/io/xml/pom.xml +++ b/sdks/java/io/xml/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java index 442fba5..7255a94 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java @@ -36,7 +36,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -522,8 +521,7 @@ public class XmlIO { @Override public PDone expand(PCollection<T> input) { - return input.apply( - org.apache.beam.sdk.io.WriteFiles.to(createSink(), SerializableFunctions.<T>identity())); + return input.apply(org.apache.beam.sdk.io.WriteFiles.to(createSink())); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java index 74e0bda..6ae83f2 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java @@ -25,7 +25,6 @@ import javax.xml.bind.JAXBContext; import javax.xml.bind.Marshaller; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.DefaultFilenamePolicy; -import org.apache.beam.sdk.io.DynamicFileDestinations; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.ShardNameTemplate; import org.apache.beam.sdk.io.fs.ResourceId; @@ -35,18 +34,18 @@ import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.MimeTypes; /** Implementation of {@link XmlIO#write}. */ -class XmlSink<T> extends FileBasedSink<T, Void> { +class XmlSink<T> extends FileBasedSink<T> { private static final String XML_EXTENSION = ".xml"; private final XmlIO.Write<T> spec; - private static <T> DefaultFilenamePolicy makeFilenamePolicy(XmlIO.Write<T> spec) { - return DefaultFilenamePolicy.fromStandardParameters( + private static DefaultFilenamePolicy makeFilenamePolicy(XmlIO.Write<?> spec) { + return DefaultFilenamePolicy.constructUsingStandardParameters( spec.getFilenamePrefix(), ShardNameTemplate.INDEX_OF_MAX, XML_EXTENSION, false); } XmlSink(XmlIO.Write<T> spec) { - super(spec.getFilenamePrefix(), DynamicFileDestinations.constant(makeFilenamePolicy(spec))); + super(spec.getFilenamePrefix(), makeFilenamePolicy(spec)); this.spec = spec; } @@ -76,8 +75,10 @@ class XmlSink<T> extends FileBasedSink<T, Void> { super.populateDisplayData(builder); } - /** {@link WriteOperation} for XML {@link FileBasedSink}s. */ - protected static final class XmlWriteOperation<T> extends WriteOperation<T, Void> { + /** + * {@link WriteOperation} for XML {@link FileBasedSink}s. + */ + protected static final class XmlWriteOperation<T> extends WriteOperation<T> { public XmlWriteOperation(XmlSink<T> sink) { super(sink); } @@ -111,8 +112,10 @@ class XmlSink<T> extends FileBasedSink<T, Void> { } } - /** A {@link Writer} that can write objects as XML elements. */ - protected static final class XmlWriter<T> extends Writer<T, Void> { + /** + * A {@link Writer} that can write objects as XML elements. + */ + protected static final class XmlWriter<T> extends Writer<T> { final Marshaller marshaller; private OutputStream os = null; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java index d1584dc..aa0c1c3 100644 --- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java +++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java @@ -197,8 +197,8 @@ public class XmlSinkTest { .withRecordClass(Integer.class); DisplayData displayData = DisplayData.from(write); - assertThat( - displayData, hasDisplayItem("filenamePattern", "/path/to/file-SSSSS-of-NNNNN" + ".xml")); + + assertThat(displayData, hasDisplayItem("filenamePattern", "file-SSSSS-of-NNNNN.xml")); assertThat(displayData, hasDisplayItem("rootElement", "bird")); assertThat(displayData, hasDisplayItem("recordClass", Integer.class)); } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/java8tests/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/java8tests/pom.xml b/sdks/java/java8tests/pom.xml index 2378014..b90a757 100644 --- a/sdks/java/java8tests/pom.xml +++ b/sdks/java/java8tests/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/javadoc/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml index 51109fb..54dae3a 100644 --- a/sdks/java/javadoc/pom.xml +++ b/sdks/java/javadoc/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../../../pom.xml</relativePath> </parent> @@ -99,16 +99,6 @@ <dependency> <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-io-amqp</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-io-cassandra</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-elasticsearch</artifactId> </dependency> @@ -134,11 +124,6 @@ <dependency> <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-io-hcatalog</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-jdbc</artifactId> </dependency> @@ -211,11 +196,13 @@ <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> + <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> + <version>${spark.version}</version> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/examples-java8/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples-java8/pom.xml b/sdks/java/maven-archetypes/examples-java8/pom.xml index b60a695..b57644d 100644 --- a/sdks/java/maven-archetypes/examples-java8/pom.xml +++ b/sdks/java/maven-archetypes/examples-java8/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-maven-archetypes-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml index 4517861..af4fbd3 100644 --- a/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml +++ b/sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources/pom.xml @@ -242,6 +242,7 @@ <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> + <version>${spark.version}</version> <scope>runtime</scope> <exclusions> <exclusion> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/examples/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/pom.xml b/sdks/java/maven-archetypes/examples/pom.xml index 2a02039..c1378cb 100644 --- a/sdks/java/maven-archetypes/examples/pom.xml +++ b/sdks/java/maven-archetypes/examples/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-maven-archetypes-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml index d039ddb..b8b9c9f 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml @@ -241,6 +241,7 @@ <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> + <version>${spark.version}</version> <scope>runtime</scope> <exclusions> <exclusion> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/pom.xml b/sdks/java/maven-archetypes/pom.xml index d676b31..b7fe274 100644 --- a/sdks/java/maven-archetypes/pom.xml +++ b/sdks/java/maven-archetypes/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/starter/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/starter/pom.xml b/sdks/java/maven-archetypes/starter/pom.xml index 8024b52..06b41c8 100644 --- a/sdks/java/maven-archetypes/starter/pom.xml +++ b/sdks/java/maven-archetypes/starter/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-maven-archetypes-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml index 6056fb0..60405e6 100644 --- a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml +++ b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml @@ -28,7 +28,7 @@ <beam.version>@project.version@</beam.version> <maven-compiler-plugin.version>3.6.1</maven-compiler-plugin.version> - <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version> + <maven-exec-plugin.version>1.4.0</maven-exec-plugin.version> <slf4j.version>1.7.14</slf4j.version> </properties> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml index 3144193..250c85a 100644 --- a/sdks/java/pom.xml +++ b/sdks/java/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/pom.xml b/sdks/pom.xml index aec8762..27b9610 100644 --- a/sdks/pom.xml +++ b/sdks/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/coders/coder_impl.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 2670250..10298bf 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -710,10 +710,6 @@ class WindowedValueCoderImpl(StreamCoderImpl): timestamp = MAX_TIMESTAMP.micros else: timestamp *= 1000 - if timestamp > MAX_TIMESTAMP.micros: - timestamp = MAX_TIMESTAMP.micros - if timestamp < MIN_TIMESTAMP.micros: - timestamp = MIN_TIMESTAMP.micros windows = self._windows_coder.decode_from_stream(in_stream, True) # Read PaneInfo encoded byte. http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/coders/coders.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index c56ef52..f40045d 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -25,7 +25,6 @@ import cPickle as pickle import google.protobuf from apache_beam.coders import coder_impl -from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.utils import urns from apache_beam.utils import proto_utils @@ -206,6 +205,7 @@ class Coder(object): """For internal use only; no backwards-compatibility guarantees. """ # TODO(BEAM-115): Use specialized URNs and components. + from apache_beam.runners.api import beam_runner_api_pb2 return beam_runner_api_pb2.Coder( spec=beam_runner_api_pb2.SdkFunctionSpec( spec=beam_runner_api_pb2.FunctionSpec( @@ -286,11 +286,6 @@ class BytesCoder(FastCoder): def is_deterministic(self): return True - def as_cloud_object(self): - return { - '@type': 'kind:bytes', - } - def __eq__(self, other): return type(self) == type(other) http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/coders/coders_test_common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 577c53a..c9b67b3 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -23,8 +23,6 @@ import unittest import dill -from apache_beam.transforms.window import GlobalWindow -from apache_beam.utils.timestamp import MIN_TIMESTAMP import observable from apache_beam.transforms import window from apache_beam.utils import timestamp @@ -289,12 +287,6 @@ class CodersTest(unittest.TestCase): # Test binary representation self.assertEqual('\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x01', coder.encode(window.GlobalWindows.windowed_value(1))) - - # Test decoding large timestamp - self.assertEqual( - coder.decode('\x7f\xdf;dZ\x1c\xac\x08\x00\x00\x00\x01\x0f\x00'), - windowed_value.create(0, MIN_TIMESTAMP.micros, (GlobalWindow(),))) - # Test unnested self.check_coder( coders.WindowedValueCoder(coders.VarIntCoder()), http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 31f71b3..9183d0d 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -589,22 +589,6 @@ class SnippetsTest(unittest.TestCase): snippets.model_textio_compressed( {'read': gzip_file_name}, ['aa', 'bb', 'cc']) - def test_model_textio_gzip_concatenated(self): - temp_path_1 = self.create_temp_file('a\nb\nc\n') - temp_path_2 = self.create_temp_file('p\nq\nr\n') - temp_path_3 = self.create_temp_file('x\ny\nz') - gzip_file_name = temp_path_1 + '.gz' - with open(temp_path_1) as src, gzip.open(gzip_file_name, 'wb') as dst: - dst.writelines(src) - with open(temp_path_2) as src, gzip.open(gzip_file_name, 'ab') as dst: - dst.writelines(src) - with open(temp_path_3) as src, gzip.open(gzip_file_name, 'ab') as dst: - dst.writelines(src) - # Add the temporary gzip file to be cleaned up as well. - self.temp_files.append(gzip_file_name) - snippets.model_textio_compressed( - {'read': gzip_file_name}, ['a', 'b', 'c', 'p', 'q', 'r', 'x', 'y', 'z']) - @unittest.skipIf(datastore_pb2 is None, 'GCP dependencies are not installed') def test_model_datastoreio(self): # We cannot test datastoreio functionality in unit tests therefore we limit http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/examples/streaming_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py index 7696d77..ed8b5d0 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount.py @@ -25,44 +25,35 @@ from __future__ import absolute_import import argparse import logging +import re import apache_beam as beam -from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.options.pipeline_options import StandardOptions import apache_beam.transforms.window as window -def split_fn(lines): - import re - return re.findall(r'[A-Za-z\']+', lines) - - def run(argv=None): """Build and run the pipeline.""" + parser = argparse.ArgumentParser() parser.add_argument( '--input_topic', required=True, - help=('Input PubSub topic of the form ' - '"projects/<PROJECT>/topics/<TOPIC>".')) + help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".') parser.add_argument( '--output_topic', required=True, - help=('Output PubSub topic of the form ' - '"projects/<PROJECT>/topic/<TOPIC>".')) + help='Output PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".') known_args, pipeline_args = parser.parse_known_args(argv) - options = PipelineOptions(pipeline_args) - options.view_as(StandardOptions).streaming = True - with beam.Pipeline(options=options) as p: + with beam.Pipeline(argv=pipeline_args) as p: - # Read from PubSub into a PCollection. + # Read the text file[pattern] into a PCollection. lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic) # Capitalize the characters in each line. transformed = (lines - # Use a pre-defined function that imports the re package. | 'Split' >> ( - beam.FlatMap(split_fn).with_output_types(unicode)) + beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) + .with_output_types(unicode)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | beam.WindowInto(window.FixedWindows(15, 0)) | 'Group' >> beam.GroupByKey() http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/examples/windowed_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/windowed_wordcount.py b/sdks/python/apache_beam/examples/windowed_wordcount.py deleted file mode 100644 index bd57847..0000000 --- a/sdks/python/apache_beam/examples/windowed_wordcount.py +++ /dev/null @@ -1,93 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""A streaming word-counting workflow. - -Important: streaming pipeline support in Python Dataflow is in development -and is not yet available for use. -""" - -from __future__ import absolute_import - -import argparse -import logging - - -import apache_beam as beam -import apache_beam.transforms.window as window - -TABLE_SCHEMA = ('word:STRING, count:INTEGER, ' - 'window_start:TIMESTAMP, window_end:TIMESTAMP') - - -def find_words(element): - import re - return re.findall(r'[A-Za-z\']+', element) - - -class FormatDoFn(beam.DoFn): - def process(self, element, window=beam.DoFn.WindowParam): - ts_format = '%Y-%m-%d %H:%M:%S.%f UTC' - window_start = window.start.to_utc_datetime().strftime(ts_format) - window_end = window.end.to_utc_datetime().strftime(ts_format) - return [{'word': element[0], - 'count': element[1], - 'window_start':window_start, - 'window_end':window_end}] - - -def run(argv=None): - """Build and run the pipeline.""" - - parser = argparse.ArgumentParser() - parser.add_argument( - '--input_topic', required=True, - help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".') - parser.add_argument( - '--output_table', required=True, - help= - ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE ' - 'or DATASET.TABLE.')) - known_args, pipeline_args = parser.parse_known_args(argv) - - with beam.Pipeline(argv=pipeline_args) as p: - - # Read the text from PubSub messages - lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic) - - # Capitalize the characters in each line. - transformed = (lines - | 'Split' >> (beam.FlatMap(find_words) - .with_output_types(unicode)) - | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) - | beam.WindowInto(window.FixedWindows(2*60, 0)) - | 'Group' >> beam.GroupByKey() - | 'Count' >> beam.Map(lambda (word, ones): (word, sum(ones))) - | 'Format' >> beam.ParDo(FormatDoFn())) - - # Write to BigQuery. - # pylint: disable=expression-not-assigned - transformed | 'Write' >> beam.io.WriteToBigQuery( - known_args.output_table, - schema=TABLE_SCHEMA, - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND) - - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - run() http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/io/filesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 1f65d0a..db6a1d0 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -26,8 +26,6 @@ import zlib import logging import time -from apache_beam.utils.plugin import BeamPlugin - logger = logging.getLogger(__name__) DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024 @@ -201,14 +199,6 @@ class CompressedFile(object): assert False, 'Possible file corruption.' except EOFError: pass # All is as expected! - elif self._compression_type == CompressionTypes.GZIP: - # If Gzip file check if there is unused data generated by gzip concat - if self._decompressor.unused_data != '': - buf = self._decompressor.unused_data - self._decompressor = zlib.decompressobj(self._gzip_mask) - decompressed = self._decompressor.decompress(buf) - self._read_buffer.write(decompressed) - continue else: self._read_buffer.write(self._decompressor.flush()) @@ -419,7 +409,7 @@ class BeamIOError(IOError): self.exception_details = exception_details -class FileSystem(BeamPlugin): +class FileSystem(object): """A class that defines the functions that can be performed on a filesystem. All methods are abstract and they are for file system providers to @@ -439,6 +429,16 @@ class FileSystem(BeamPlugin): return compression_type @classmethod + def get_all_subclasses(cls): + """Get all the subclasses of the FileSystem class + """ + all_subclasses = [] + for subclass in cls.__subclasses__(): + all_subclasses.append(subclass) + all_subclasses.extend(subclass.get_all_subclasses()) + return all_subclasses + + @classmethod def scheme(cls): """URI scheme for the FileSystem """ http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/io/gcp/gcsio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 643fbc7..d43c8ba 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -31,7 +31,6 @@ import re import threading import time import traceback -import httplib2 from apache_beam.utils import retry @@ -69,10 +68,6 @@ except ImportError: # +---------------+------------+-------------+-------------+-------------+ DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024 -# This is the number of seconds the library will wait for GCS operations to -# complete. -DEFAULT_HTTP_TIMEOUT_SECONDS = 60 - # This is the number of seconds the library will wait for a partial-file read # operation from GCS to complete before retrying. DEFAULT_READ_SEGMENT_TIMEOUT_SECONDS = 60 @@ -104,7 +99,6 @@ class GcsIO(object): def __new__(cls, storage_client=None): if storage_client: - # This path is only used for testing. return super(GcsIO, cls).__new__(cls, storage_client) else: # Create a single storage client for each thread. We would like to avoid @@ -114,9 +108,7 @@ class GcsIO(object): local_state = threading.local() if getattr(local_state, 'gcsio_instance', None) is None: credentials = auth.get_service_credentials() - storage_client = storage.StorageV1( - credentials=credentials, - http=httplib2.Http(timeout=DEFAULT_HTTP_TIMEOUT_SECONDS)) + storage_client = storage.StorageV1(credentials=credentials) local_state.gcsio_instance = ( super(GcsIO, cls).__new__(cls, storage_client)) local_state.gcsio_instance.client = storage_client
