http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/pom.xml ---------------------------------------------------------------------- diff --git a/sdk/pom.xml b/sdk/pom.xml deleted file mode 100644 index 13fe950..0000000 --- a/sdk/pom.xml +++ /dev/null @@ -1,771 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.beam</groupId> - <artifactId>parent</artifactId> - <version>0.1.0-incubating-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <artifactId>java-sdk-all</artifactId> - <name>Apache Beam :: SDK :: Java All</name> - <description>Beam SDK Java All provides a simple, Java-based - interface for processing virtually any size data. This - artifact includes entire Apache Beam Java SDK.</description> - - <packaging>jar</packaging> - - <properties> - <timestamp>${maven.build.timestamp}</timestamp> - <maven.build.timestamp.format>yyyy-MM-dd HH:mm</maven.build.timestamp.format> - <dataflow>com.google.cloud.dataflow</dataflow> - <runIntegrationTestOnService>false</runIntegrationTestOnService> - <testParallelValue>none</testParallelValue> - <testGroups></testGroups> - <dataflowProjectName></dataflowProjectName> - </properties> - - <profiles> - <profile> - <id>DataflowPipelineTests</id> - <properties> - <runIntegrationTestOnService>true</runIntegrationTestOnService> - <testGroups>com.google.cloud.dataflow.sdk.testing.RunnableOnService</testGroups> - <testParallelValue>both</testParallelValue> - </properties> - </profile> - </profiles> - - <build> - <resources> - <resource> - <directory>src/main/resources</directory> - <filtering>true</filtering> - </resource> - </resources> - - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <executions> - <execution> - <goals><goal>analyze-only</goal></goals> - <configuration> - <failOnWarning>true</failOnWarning> - </configuration> - </execution> - </executions> - </plugin> - - <!-- Run CheckStyle pass on transforms, as they are release in - source form. --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <version>2.12</version> - <dependencies> - <dependency> - <groupId>com.puppycrawl.tools</groupId> - <artifactId>checkstyle</artifactId> - <version>6.6</version> - </dependency> - </dependencies> - <configuration> - <configLocation>../checkstyle.xml</configLocation> - <consoleOutput>true</consoleOutput> - <failOnViolation>true</failOnViolation> - <includeResources>false</includeResources> - <includeTestSourceDirectory>true</includeTestSourceDirectory> - <excludes>${project.build.directory}/generated-test-sources/**</excludes> - </configuration> - <executions> - <execution> - <goals> - <goal>check</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <executions> - <execution> - <id>default-jar</id> - <goals> - <goal>jar</goal> - </goals> - </execution> - <execution> - <id>default-test-jar</id> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - - <!-- Source plugin for generating source and test-source JARs. --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-source-plugin</artifactId> - <version>2.4</version> - <executions> - <execution> - <id>attach-sources</id> - <phase>compile</phase> - <goals> - <goal>jar</goal> - </goals> - </execution> - <execution> - <id>attach-test-sources</id> - <phase>test-compile</phase> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-javadoc-plugin</artifactId> - <configuration> - <windowtitle>Google Cloud Dataflow SDK ${project.version} API</windowtitle> - <doctitle>Google Cloud Dataflow SDK for Java, version ${project.version}</doctitle> - <overview>../javadoc/overview.html</overview> - - <subpackages>com.google.cloud.dataflow.sdk</subpackages> - <additionalparam>-exclude com.google.cloud.dataflow.sdk.runners.worker:com.google.cloud.dataflow.sdk.runners.dataflow:com.google.cloud.dataflow.sdk.util:com.google.cloud.dataflow.sdk.runners.inprocess ${dataflow.javadoc_opts}</additionalparam> - <use>false</use> - <quiet>true</quiet> - <bottom><![CDATA[<br>]]></bottom> - - <offlineLinks> - <offlineLink> - <url>https://developers.google.com/api-client-library/java/google-api-java-client/reference/1.20.0/</url> - <location>${basedir}/../javadoc/apiclient-docs</location> - </offlineLink> - <offlineLink> - <url>http://avro.apache.org/docs/1.7.7/api/java/</url> - <location>${basedir}/../javadoc/avro-docs</location> - </offlineLink> - <offlineLink> - <url>https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/</url> - <location>${basedir}/../javadoc/bq-docs</location> - </offlineLink> - <offlineLink> - <url>https://cloud.google.com/datastore/docs/apis/javadoc/</url> - <location>${basedir}/../javadoc/datastore-docs</location> - </offlineLink> - <offlineLink> - <url>http://docs.guava-libraries.googlecode.com/git-history/release19/javadoc/</url> - <location>${basedir}/../javadoc/guava-docs</location> - </offlineLink> - <offlineLink> - <url>http://hamcrest.org/JavaHamcrest/javadoc/1.3/</url> - <location>${basedir}/../javadoc/hamcrest-docs</location> - </offlineLink> - <offlineLink> - <url>http://fasterxml.github.io/jackson-annotations/javadoc/2.7/</url> - <location>${basedir}/../javadoc/jackson-annotations-docs</location> - </offlineLink> - <offlineLink> - <url>http://fasterxml.github.io/jackson-databind/javadoc/2.7/</url> - <location>${basedir}/../javadoc/jackson-databind-docs</location> - </offlineLink> - <offlineLink> - <url>http://www.joda.org/joda-time/apidocs</url> - <location>${basedir}/../javadoc/joda-docs</location> - </offlineLink> - <offlineLink> - <url>http://junit.sourceforge.net/javadoc/</url> - <location>${basedir}/../javadoc/junit-docs</location> - </offlineLink> - <offlineLink> - <url>https://developers.google.com/api-client-library/java/google-oauth-java-client/reference/1.20.0/</url> - <location>${basedir}/../javadoc/oauth-docs</location> - </offlineLink> - </offlineLinks> - </configuration> - <executions> - <execution> - <goals> - <goal>jar</goal> - </goals> - <phase>package</phase> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <version>2.4.1</version> - <executions> - <!-- In the first phase, we pick dependencies and relocate them. --> - <execution> - <id>bundle-and-repackage</id> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <shadeTestJar>true</shadeTestJar> - <artifactSet> - <includes> - <include>com.google.cloud.bigtable:bigtable-client-core</include> - <include>com.google.guava:guava</include> - </includes> - </artifactSet> - <filters> - <filter> - <artifact>*:*</artifact> - <excludes> - <exclude>META-INF/*.SF</exclude> - <exclude>META-INF/*.DSA</exclude> - <exclude>META-INF/*.RSA</exclude> - </excludes> - </filter> - </filters> - <relocations> - <!-- TODO: Once ready, change the following pattern to 'com' - only, exclude 'com.google.cloud.dataflow.**', and remove - the second relocation. --> - <relocation> - <pattern>com.google.common</pattern> - <shadedPattern>com.google.cloud.dataflow.sdk.repackaged.com.google.common</shadedPattern> - </relocation> - <relocation> - <pattern>com.google.thirdparty</pattern> - <shadedPattern>com.google.cloud.dataflow.sdk.repackaged.com.google.thirdparty</shadedPattern> - </relocation> - <relocation> - <pattern>com.google.cloud.bigtable</pattern> - <shadedPattern>com.google.cloud.dataflow.sdk.repackaged.com.google.cloud.bigtable</shadedPattern> - <excludes> - <exclude>com.google.cloud.bigtable.config.BigtableOptions*</exclude> - <exclude>com.google.cloud.bigtable.config.CredentialOptions*</exclude> - <exclude>com.google.cloud.bigtable.config.RetryOptions*</exclude> - <exclude>com.google.cloud.bigtable.grpc.BigtableClusterName</exclude> - <exclude>com.google.cloud.bigtable.grpc.BigtableTableName</exclude> - </excludes> - </relocation> - </relocations> - </configuration> - </execution> - - <!-- In the second phase, we pick remaining dependencies and bundle - them without repackaging. --> - <execution> - <id>bundle-rest-without-repackaging</id> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <shadeTestJar>true</shadeTestJar> - <finalName>${project.artifactId}-bundled-${project.version}</finalName> - <artifactSet> - <excludes> - <exclude>com.google.cloud.bigtable:bigtable-client-core</exclude> - <exclude>com.google.guava:guava</exclude> - </excludes> - </artifactSet> - <filters> - <filter> - <artifact>*:*</artifact> - <excludes> - <exclude>META-INF/*.SF</exclude> - <exclude>META-INF/*.DSA</exclude> - <exclude>META-INF/*.RSA</exclude> - </excludes> - </filter> - </filters> - </configuration> - </execution> - </executions> - </plugin> - - <!-- Coverage analysis for unit tests. --> - <plugin> - <groupId>org.jacoco</groupId> - <artifactId>jacoco-maven-plugin</artifactId> - </plugin> - - <!-- Avro plugin for automatic code generation --> - <plugin> - <groupId>org.apache.avro</groupId> - <artifactId>avro-maven-plugin</artifactId> - <version>${avro.version}</version> - <executions> - <execution> - <id>schemas</id> - <phase>generate-sources</phase> - <goals> - <goal>schema</goal> - </goals> - <configuration> - <testSourceDirectory>${project.basedir}/src/test/</testSourceDirectory> - <testOutputDirectory>${project.build.directory}/generated-test-sources/java</testOutputDirectory> - </configuration> - </execution> - </executions> - </plugin> - - <!-- This plugin tells Maven about an additional test-source directory to - build, which contains Avro-generated source files. This is not - strictly needed for the regular Maven build, but helps certain IDEs - automatically find and compile generated code. --> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.9.1</version> - <executions> - <execution> - <id>add-test-source</id> - <phase>generate-test-sources</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>${project.build.directory}/generated-test-sources/java</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - - <dependencies> - <dependency> - <groupId>com.google.apis</groupId> - <artifactId>google-api-services-dataflow</artifactId> - <version>${dataflow.version}</version> - <exclusions> - <!-- Exclude an old version of guava that is being pulled - in by a transitive dependency of google-api-client --> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava-jdk5</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>io.grpc</groupId> - <artifactId>grpc-all</artifactId> - <version>0.12.0</version> - </dependency> - - <dependency> - <groupId>com.google.cloud.bigtable</groupId> - <artifactId>bigtable-protos</artifactId> - <version>${bigtable.version}</version> - </dependency> - - <dependency> - <groupId>com.google.cloud.bigtable</groupId> - <artifactId>bigtable-client-core</artifactId> - <version>${bigtable.version}</version> - </dependency> - - <dependency> - <groupId>com.google.api-client</groupId> - <artifactId>google-api-client</artifactId> - <version>${google-clients.version}</version> - <exclusions> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava-jdk5</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>com.google.apis</groupId> - <artifactId>google-api-services-bigquery</artifactId> - <version>${bigquery.version}</version> - <exclusions> - <!-- Exclude an old version of guava that is being pulled - in by a transitive dependency of google-api-client --> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava-jdk5</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>com.google.apis</groupId> - <artifactId>google-api-services-clouddebugger</artifactId> - <version>${clouddebugger.version}</version> - <exclusions> - <!-- Exclude an old version of guava that is being pulled - in by a transitive dependency of google-api-client --> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava-jdk5</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>com.google.apis</groupId> - <artifactId>google-api-services-pubsub</artifactId> - <version>${pubsub.version}</version> - <exclusions> - <!-- Exclude an old version of guava that is being pulled - in by a transitive dependency of google-api-client --> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava-jdk5</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>com.google.apis</groupId> - <artifactId>google-api-services-storage</artifactId> - <version>${storage.version}</version> - <exclusions> - <!-- Exclude an old version of guava that is being pulled - in by a transitive dependency of google-api-client --> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava-jdk5</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>com.google.http-client</groupId> - <artifactId>google-http-client</artifactId> - <version>${google-clients.version}</version> - <exclusions> - <!-- Exclude an old version of guava that is being pulled - in by a transitive dependency of google-api-client --> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava-jdk5</artifactId> - </exclusion> - </exclusions> - </dependency> - - <!-- Required by com.google.apis:google-api-services-datastore-protobuf, - but the version they depend on differs from our api-client versions --> - <dependency> - <groupId>com.google.http-client</groupId> - <artifactId>google-http-client-jackson</artifactId> - <version>${google-clients.version}</version> - <exclusions> - <!-- Exclude an old version of guava that is being pulled - in by a transitive dependency of google-api-client --> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava-jdk5</artifactId> - </exclusion> - <!-- Exclude an old version of jackson-core-asl --> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-core-asl</artifactId> - </exclusion> - </exclusions> - <scope>runtime</scope> - </dependency> - - <dependency> - <groupId>com.google.http-client</groupId> - <artifactId>google-http-client-jackson2</artifactId> - <version>${google-clients.version}</version> - <exclusions> - <!-- Exclude an old version of guava that is being pulled - in by a transitive dependency of google-api-client --> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava-jdk5</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>com.google.http-client</groupId> - <artifactId>google-http-client-protobuf</artifactId> - <version>${google-clients.version}</version> - <exclusions> - <!-- Exclude an old version of guava that is being pulled - in by a transitive dependency of google-api-client --> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava-jdk5</artifactId> - </exclusion> - </exclusions> - <scope>runtime</scope> - </dependency> - - <dependency> - <groupId>com.google.oauth-client</groupId> - <artifactId>google-oauth-client-java6</artifactId> - <version>${google-clients.version}</version> - <exclusions> - <!-- Exclude an old version of guava that is being pulled - in by a transitive dependency of google-api-client --> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava-jdk5</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>com.google.oauth-client</groupId> - <artifactId>google-oauth-client</artifactId> - <version>${google-clients.version}</version> - <exclusions> - <!-- Exclude an old version of guava that is being pulled - in by a transitive dependency of google-api-client --> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava-jdk5</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>com.google.apis</groupId> - <artifactId>google-api-services-datastore-protobuf</artifactId> - <version>${datastore.version}</version> - <exclusions> - <!-- Exclude an old version of guava that is being pulled - in by a transitive dependency of google-api-client --> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava-jdk5</artifactId> - </exclusion> - <!-- Exclude old version of api client dependencies. --> - <exclusion> - <groupId>com.google.http-client</groupId> - <artifactId>google-http-client</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.api-client</groupId> - <artifactId>google-api-client</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.oauth-client</groupId> - <artifactId>google-oauth-client</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.http-client</groupId> - <artifactId>google-http-client-jackson</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.http-client</groupId> - <artifactId>google-http-client-protobuf</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>com.google.cloud.bigdataoss</groupId> - <artifactId>gcsio</artifactId> - <version>1.4.3</version> - </dependency> - - <dependency> - <groupId>com.google.cloud.bigdataoss</groupId> - <artifactId>util</artifactId> - <version>1.4.3</version> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <!-- If updating version, please update the javadoc offlineLink --> - <version>${guava.version}</version> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava-testlib</artifactId> - <version>${guava.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - <version>${protobuf.version}</version> - </dependency> - - <dependency> - <groupId>com.google.code.findbugs</groupId> - <artifactId>jsr305</artifactId> - <version>${jsr305.version}</version> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - <version>${jackson.version}</version> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-annotations</artifactId> - <version>${jackson.version}</version> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - <version>${jackson.version}</version> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <version>${slf4j.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - <version>${avro.version}</version> - </dependency> - - <dependency> - <groupId>org.xerial.snappy</groupId> - <artifactId>snappy-java</artifactId> - <version>1.1.2.1</version> - </dependency> - - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-compress</artifactId> - <version>1.9</version> - </dependency> - - <dependency> - <groupId>joda-time</groupId> - <artifactId>joda-time</artifactId> - <version>${joda.version}</version> - </dependency> - - <!-- - To use com.google.cloud.dataflow.io.XmlSource: - - 1. Explicitly declare the following dependency for the stax2 API. - 2. Include a stax2 implementation on the classpath. One example - is given below as an optional runtime dependency on woodstox-core-asl - --> - <dependency> - <groupId>org.codehaus.woodstox</groupId> - <artifactId>stax2-api</artifactId> - <version>${stax2.version}</version> - <optional>true</optional> - </dependency> - - <dependency> - <groupId>org.codehaus.woodstox</groupId> - <artifactId>woodstox-core-asl</artifactId> - <version>${woodstox.version}</version> - <scope>runtime</scope> - <optional>true</optional> - <exclusions> - <!-- javax.xml.stream:stax-api is included in JDK 1.6+ --> - <exclusion> - <groupId>javax.xml.stream</groupId> - <artifactId>stax-api</artifactId> - </exclusion> - </exclusions> - </dependency> - - <!-- - To use com.google.cloud.dataflow.io.AvroSource with XZ-encoded files, - please explicitly declare this dependency to include org.tukaani:xz on - the classpath at runtime. - --> - <dependency> - <groupId>org.tukaani</groupId> - <artifactId>xz</artifactId> - <version>1.5</version> - <scope>runtime</scope> - <optional>true</optional> - </dependency> - - <!-- build dependencies --> - <dependency> - <groupId>com.google.auto.service</groupId> - <artifactId>auto-service</artifactId> - <version>1.0-rc2</version> - <optional>true</optional> - </dependency> - - <!-- test dependencies --> - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-all</artifactId> - <version>${hamcrest.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>${junit.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-jdk14</artifactId> - <version>${slf4j.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> - <version>1.10.19</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>com.google.cloud.dataflow</groupId> - <artifactId>google-cloud-dataflow-java-proto-library-all</artifactId> - <version>0.5.160304</version> - <scope>test</scope> - </dependency> - </dependencies> -</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/Pipeline.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/Pipeline.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/Pipeline.java deleted file mode 100644 index b166673..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/Pipeline.java +++ /dev/null @@ -1,502 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk; - -import com.google.cloud.dataflow.sdk.coders.CoderRegistry; -import com.google.cloud.dataflow.sdk.io.Read; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.runners.PipelineRunner; -import com.google.cloud.dataflow.sdk.runners.TransformHierarchy; -import com.google.cloud.dataflow.sdk.runners.TransformTreeNode; -import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.util.UserCodeException; -import com.google.cloud.dataflow.sdk.values.PBegin; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PInput; -import com.google.cloud.dataflow.sdk.values.POutput; -import com.google.cloud.dataflow.sdk.values.PValue; -import com.google.common.base.Preconditions; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** - * A {@link Pipeline} manages a directed acyclic graph of {@link PTransform PTransforms}, and the - * {@link PCollection PCollections} that the {@link PTransform}s consume and produce. - * - * <p>A {@link Pipeline} is initialized with a {@link PipelineRunner} that will later - * execute the {@link Pipeline}. - * - * <p>{@link Pipeline Pipelines} are independent, so they can be constructed and executed - * concurrently. - * - * <p>Each {@link Pipeline} is self-contained and isolated from any other - * {@link Pipeline}. The {@link PValue PValues} that are inputs and outputs of each of a - * {@link Pipeline Pipeline's} {@link PTransform PTransforms} are also owned by that - * {@link Pipeline}. A {@link PValue} owned by one {@link Pipeline} can be read only by - * {@link PTransform PTransforms} also owned by that {@link Pipeline}. - * - * <p>Here is a typical example of use: - * <pre> {@code - * // Start by defining the options for the pipeline. - * PipelineOptions options = PipelineOptionsFactory.create(); - * // Then create the pipeline. The runner is determined by the options. - * Pipeline p = Pipeline.create(options); - * - * // A root PTransform, like TextIO.Read or Create, gets added - * // to the Pipeline by being applied: - * PCollection<String> lines = - * p.apply(TextIO.Read.from("gs://bucket/dir/file*.txt")); - * - * // A Pipeline can have multiple root transforms: - * PCollection<String> moreLines = - * p.apply(TextIO.Read.from("gs://bucket/other/dir/file*.txt")); - * PCollection<String> yetMoreLines = - * p.apply(Create.of("yet", "more", "lines").withCoder(StringUtf8Coder.of())); - * - * // Further PTransforms can be applied, in an arbitrary (acyclic) graph. - * // Subsequent PTransforms (and intermediate PCollections etc.) are - * // implicitly part of the same Pipeline. - * PCollection<String> allLines = - * PCollectionList.of(lines).and(moreLines).and(yetMoreLines) - * .apply(new Flatten<String>()); - * PCollection<KV<String, Integer>> wordCounts = - * allLines - * .apply(ParDo.of(new ExtractWords())) - * .apply(new Count<String>()); - * PCollection<String> formattedWordCounts = - * wordCounts.apply(ParDo.of(new FormatCounts())); - * formattedWordCounts.apply(TextIO.Write.to("gs://bucket/dir/counts.txt")); - * - * // PTransforms aren't executed when they're applied, rather they're - * // just added to the Pipeline. Once the whole Pipeline of PTransforms - * // is constructed, the Pipeline's PTransforms can be run using a - * // PipelineRunner. The default PipelineRunner executes the Pipeline - * // directly, sequentially, in this one process, which is useful for - * // unit tests and simple experiments: - * p.run(); - * - * } </pre> - */ -public class Pipeline { - private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class); - - /** - * Thrown during execution of a {@link Pipeline}, whenever user code within that - * {@link Pipeline} throws an exception. - * - * <p>The original exception thrown by user code may be retrieved via {@link #getCause}. - */ - public static class PipelineExecutionException extends RuntimeException { - /** - * Wraps {@code cause} into a {@link PipelineExecutionException}. - */ - public PipelineExecutionException(Throwable cause) { - super(cause); - } - } - - ///////////////////////////////////////////////////////////////////////////// - // Public operations. - - /** - * Constructs a pipeline from the provided options. - * - * @return The newly created pipeline. - */ - public static Pipeline create(PipelineOptions options) { - Pipeline pipeline = new Pipeline(PipelineRunner.fromOptions(options), options); - LOG.debug("Creating {}", pipeline); - return pipeline; - } - - /** - * Returns a {@link PBegin} owned by this Pipeline. This is useful - * as the input of a root PTransform such as {@link Read} or - * {@link Create}. - */ - public PBegin begin() { - return PBegin.in(this); - } - - /** - * Like {@link #apply(String, PTransform)} but the transform node in the {@link Pipeline} - * graph will be named according to {@link PTransform#getName}. - * - * @see #apply(String, PTransform) - */ - public <OutputT extends POutput> OutputT apply( - PTransform<? super PBegin, OutputT> root) { - return begin().apply(root); - } - - /** - * Adds a root {@link PTransform}, such as {@link Read} or {@link Create}, - * to this {@link Pipeline}. - * - * <p>The node in the {@link Pipeline} graph will use the provided {@code name}. - * This name is used in various places, including the monitoring UI, logging, - * and to stably identify this node in the {@link Pipeline} graph upon update. - * - * <p>Alias for {@code begin().apply(name, root)}. - */ - public <OutputT extends POutput> OutputT apply( - String name, PTransform<? super PBegin, OutputT> root) { - return begin().apply(name, root); - } - - /** - * Runs the {@link Pipeline} using its {@link PipelineRunner}. - */ - public PipelineResult run() { - LOG.debug("Running {} via {}", this, runner); - try { - return runner.run(this); - } catch (UserCodeException e) { - // This serves to replace the stack with one that ends here and - // is caused by the caught UserCodeException, thereby splicing - // out all the stack frames in between the PipelineRunner itself - // and where the worker calls into the user's code. - throw new PipelineExecutionException(e.getCause()); - } - } - - - ///////////////////////////////////////////////////////////////////////////// - // Below here are operations that aren't normally called by users. - - /** - * Returns the {@link CoderRegistry} that this {@link Pipeline} uses. - */ - public CoderRegistry getCoderRegistry() { - if (coderRegistry == null) { - coderRegistry = new CoderRegistry(); - coderRegistry.registerStandardCoders(); - } - return coderRegistry; - } - - /** - * Sets the {@link CoderRegistry} that this {@link Pipeline} uses. - */ - public void setCoderRegistry(CoderRegistry coderRegistry) { - this.coderRegistry = coderRegistry; - } - - /** - * A {@link PipelineVisitor} can be passed into - * {@link Pipeline#traverseTopologically} to be called for each of the - * transforms and values in the {@link Pipeline}. - */ - public interface PipelineVisitor { - /** - * Called for each composite transform after all topological predecessors have been visited - * but before any of its component transforms. - */ - public void enterCompositeTransform(TransformTreeNode node); - - /** - * Called for each composite transform after all of its component transforms and their outputs - * have been visited. - */ - public void leaveCompositeTransform(TransformTreeNode node); - - /** - * Called for each primitive transform after all of its topological predecessors - * and inputs have been visited. - */ - public void visitTransform(TransformTreeNode node); - - /** - * Called for each value after the transform that produced the value has been - * visited. - */ - public void visitValue(PValue value, TransformTreeNode producer); - } - - /** - * Invokes the {@link PipelineVisitor PipelineVisitor's} - * {@link PipelineVisitor#visitTransform} and - * {@link PipelineVisitor#visitValue} operations on each of this - * {@link Pipeline Pipeline's} transform and value nodes, in forward - * topological order. - * - * <p>Traversal of the {@link Pipeline} causes {@link PTransform PTransforms} and - * {@link PValue PValues} owned by the {@link Pipeline} to be marked as finished, - * at which point they may no longer be modified. - * - * <p>Typically invoked by {@link PipelineRunner} subclasses. - */ - public void traverseTopologically(PipelineVisitor visitor) { - Set<PValue> visitedValues = new HashSet<>(); - // Visit all the transforms, which should implicitly visit all the values. - transforms.visit(visitor, visitedValues); - if (!visitedValues.containsAll(values)) { - throw new RuntimeException( - "internal error: should have visited all the values " - + "after visiting all the transforms"); - } - } - - /** - * Like {@link #applyTransform(String, PInput, PTransform)} but defaulting to the name - * provided by the {@link PTransform}. - */ - public static <InputT extends PInput, OutputT extends POutput> - OutputT applyTransform(InputT input, - PTransform<? super InputT, OutputT> transform) { - return input.getPipeline().applyInternal(transform.getName(), input, transform); - } - - /** - * Applies the given {@code PTransform} to this input {@code InputT} and returns - * its {@code OutputT}. This uses {@code name} to identify this specific application - * of the transform. This name is used in various places, including the monitoring UI, - * logging, and to stably identify this application node in the {@link Pipeline} graph during - * update. - * - * <p>Each {@link PInput} subclass that provides an {@code apply} method should delegate to - * this method to ensure proper registration with the {@link PipelineRunner}. - */ - public static <InputT extends PInput, OutputT extends POutput> - OutputT applyTransform(String name, InputT input, - PTransform<? super InputT, OutputT> transform) { - return input.getPipeline().applyInternal(name, input, transform); - } - - ///////////////////////////////////////////////////////////////////////////// - // Below here are internal operations, never called by users. - - private final PipelineRunner<?> runner; - private final PipelineOptions options; - private final TransformHierarchy transforms = new TransformHierarchy(); - private Collection<PValue> values = new ArrayList<>(); - private Set<String> usedFullNames = new HashSet<>(); - private CoderRegistry coderRegistry; - private Multimap<PTransform<?, ?>, AppliedPTransform<?, ?, ?>> transformApplicationsForTesting = - HashMultimap.create(); - - /** - * @deprecated replaced by {@link #Pipeline(PipelineRunner, PipelineOptions)} - */ - @Deprecated - protected Pipeline(PipelineRunner<?> runner) { - this(runner, PipelineOptionsFactory.create()); - } - - protected Pipeline(PipelineRunner<?> runner, PipelineOptions options) { - this.runner = runner; - this.options = options; - } - - @Override - public String toString() { - return "Pipeline#" + hashCode(); - } - - /** - * Applies a {@link PTransform} to the given {@link PInput}. - * - * @see Pipeline#apply - */ - private <InputT extends PInput, OutputT extends POutput> - OutputT applyInternal(String name, InputT input, - PTransform<? super InputT, OutputT> transform) { - input.finishSpecifying(); - - TransformTreeNode parent = transforms.getCurrent(); - String namePrefix = parent.getFullName(); - String fullName = uniquifyInternal(namePrefix, name); - - boolean nameIsUnique = fullName.equals(buildName(namePrefix, name)); - - if (!nameIsUnique) { - switch (getOptions().getStableUniqueNames()) { - case OFF: - break; - case WARNING: - LOG.warn("Transform {} does not have a stable unique name. " - + "This will prevent updating of pipelines.", fullName); - break; - case ERROR: - throw new IllegalStateException( - "Transform " + fullName + " does not have a stable unique name. " - + "This will prevent updating of pipelines."); - default: - throw new IllegalArgumentException( - "Unrecognized value for stable unique names: " + getOptions().getStableUniqueNames()); - } - } - - TransformTreeNode child = - new TransformTreeNode(parent, transform, fullName, input); - parent.addComposite(child); - - transforms.addInput(child, input); - - LOG.debug("Adding {} to {}", transform, this); - try { - transforms.pushNode(child); - transform.validate(input); - OutputT output = runner.apply(transform, input); - transforms.setOutput(child, output); - - AppliedPTransform<?, ?, ?> applied = AppliedPTransform.of( - child.getFullName(), input, output, transform); - transformApplicationsForTesting.put(transform, applied); - // recordAsOutput is a NOOP if already called; - output.recordAsOutput(applied); - verifyOutputState(output, child); - return output; - } finally { - transforms.popNode(); - } - } - - /** - * Returns all producing transforms for the {@link PValue PValues} contained - * in {@code output}. - */ - private List<AppliedPTransform<?, ?, ?>> getProducingTransforms(POutput output) { - List<AppliedPTransform<?, ?, ?>> producingTransforms = new ArrayList<>(); - for (PValue value : output.expand()) { - AppliedPTransform<?, ?, ?> transform = value.getProducingTransformInternal(); - if (transform != null) { - producingTransforms.add(transform); - } - } - return producingTransforms; - } - - /** - * Verifies that the output of a {@link PTransform} is correctly configured in its - * {@link TransformTreeNode} in the {@link Pipeline} graph. - * - * <p>A non-composite {@link PTransform} must have all - * of its outputs registered as produced by that {@link PTransform}. - * - * <p>A composite {@link PTransform} must have all of its outputs - * registered as produced by the contained primitive {@link PTransform PTransforms}. - * They have each had the above check performed already, when - * they were applied, so the only possible failure state is - * that the composite {@link PTransform} has returned a primitive output. - */ - private void verifyOutputState(POutput output, TransformTreeNode node) { - if (!node.isCompositeNode()) { - PTransform<?, ?> thisTransform = node.getTransform(); - List<AppliedPTransform<?, ?, ?>> producingTransforms = getProducingTransforms(output); - for (AppliedPTransform<?, ?, ?> producingTransform : producingTransforms) { - // Using != because object identity indicates that the transforms - // are the same node in the pipeline - if (thisTransform != producingTransform.getTransform()) { - throw new IllegalArgumentException("Output of non-composite transform " - + thisTransform + " is registered as being produced by" - + " a different transform: " + producingTransform); - } - } - } else { - PTransform<?, ?> thisTransform = node.getTransform(); - List<AppliedPTransform<?, ?, ?>> producingTransforms = getProducingTransforms(output); - for (AppliedPTransform<?, ?, ?> producingTransform : producingTransforms) { - // Using == because object identity indicates that the transforms - // are the same node in the pipeline - if (thisTransform == producingTransform.getTransform()) { - throw new IllegalStateException("Output of composite transform " - + thisTransform + " is registered as being produced by it," - + " but the output of every composite transform should be" - + " produced by a primitive transform contained therein."); - } - } - } - } - - /** - * Returns the configured {@link PipelineRunner}. - */ - public PipelineRunner<?> getRunner() { - return runner; - } - - /** - * Returns the configured {@link PipelineOptions}. - */ - public PipelineOptions getOptions() { - return options; - } - - /** - * @deprecated this method is no longer compatible with the design of {@link Pipeline}, - * as {@link PTransform PTransforms} can be applied multiple times, with different names - * each time. - */ - @Deprecated - public String getFullNameForTesting(PTransform<?, ?> transform) { - Collection<AppliedPTransform<?, ?, ?>> uses = - transformApplicationsForTesting.get(transform); - Preconditions.checkState(uses.size() > 0, "Unknown transform: " + transform); - Preconditions.checkState(uses.size() <= 1, "Transform used multiple times: " + transform); - return Iterables.getOnlyElement(uses).getFullName(); - } - - /** - * Returns a unique name for a transform with the given prefix (from - * enclosing transforms) and initial name. - * - * <p>For internal use only. - */ - private String uniquifyInternal(String namePrefix, String origName) { - String name = origName; - int suffixNum = 2; - while (true) { - String candidate = buildName(namePrefix, name); - if (usedFullNames.add(candidate)) { - return candidate; - } - // A duplicate! Retry. - name = origName + suffixNum++; - } - } - - /** - * Builds a name from a "/"-delimited prefix and a name. - */ - private String buildName(String namePrefix, String name) { - return namePrefix.isEmpty() ? name : namePrefix + "/" + name; - } - - /** - * Adds the given {@link PValue} to this {@link Pipeline}. - * - * <p>For internal use only. - */ - public void addValueInternal(PValue value) { - this.values.add(value); - LOG.debug("Adding {} to {}", value, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/PipelineResult.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/PipelineResult.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/PipelineResult.java deleted file mode 100644 index 6b9a36b..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/PipelineResult.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk; - -import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException; -import com.google.cloud.dataflow.sdk.runners.AggregatorValues; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; - -/** - * Result of {@link Pipeline#run()}. - */ -public interface PipelineResult { - - /** - * Retrieves the current state of the pipeline execution. - * - * @return the {@link State} representing the state of this pipeline. - */ - State getState(); - - /** - * Retrieves the current value of the provided {@link Aggregator}. - * - * @param aggregator the {@link Aggregator} to retrieve values for. - * @return the current values of the {@link Aggregator}, - * which may be empty if there are no values yet. - * @throws AggregatorRetrievalException if the {@link Aggregator} values could not be retrieved. - */ - <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) - throws AggregatorRetrievalException; - - // TODO: method to retrieve error messages. - - /** Named constants for common values for the job state. */ - public enum State { - - /** The job state could not be obtained or was not specified. */ - UNKNOWN(false, false), - - /** The job has been paused, or has not yet started. */ - STOPPED(false, false), - - /** The job is currently running. */ - RUNNING(false, false), - - /** The job has successfully completed. */ - DONE(true, false), - - /** The job has failed. */ - FAILED(true, false), - - /** The job has been explicitly cancelled. */ - CANCELLED(true, false), - - /** The job has been updated. */ - UPDATED(true, true); - - private final boolean terminal; - - private final boolean hasReplacement; - - private State(boolean terminal, boolean hasReplacement) { - this.terminal = terminal; - this.hasReplacement = hasReplacement; - } - - /** - * @return {@code true} if the job state can no longer complete work. - */ - public final boolean isTerminal() { - return terminal; - } - - /** - * @return {@code true} if this job state indicates that a replacement job exists. - */ - public final boolean hasReplacementJob() { - return hasReplacement; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/annotations/Experimental.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/annotations/Experimental.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/annotations/Experimental.java deleted file mode 100644 index cac2aa8..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/annotations/Experimental.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.annotations; - -import java.lang.annotation.Documented; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * Signifies that a public API (public class, method or field) is subject to - * incompatible changes, or even removal, in a future release. An API bearing - * this annotation is exempt from any compatibility guarantees made by its - * containing library. Note that the presence of this annotation implies nothing - * about the quality or performance of the API in question, only the fact that - * it is not "API-frozen." - * - * <p>It is generally safe for <i>applications</i> to depend on experimental - * APIs, at the cost of some extra work during upgrades. However, it is - * generally inadvisable for <i>libraries</i> (which get included on users' - * class paths, outside the library developers' control) to do so. - */ -@Retention(RetentionPolicy.CLASS) -@Target({ - ElementType.ANNOTATION_TYPE, - ElementType.CONSTRUCTOR, - ElementType.FIELD, - ElementType.METHOD, - ElementType.TYPE}) -@Documented -public @interface Experimental { - public Kind value() default Kind.UNSPECIFIED; - - /** - * An enumeration of various kinds of experimental APIs. - */ - public enum Kind { - /** Generic group of experimental APIs. This is the default value. */ - UNSPECIFIED, - - /** Sources and sinks related experimental APIs. */ - SOURCE_SINK, - - /** Auto-scaling related experimental APIs. */ - AUTOSCALING, - - /** Trigger-related experimental APIs. */ - TRIGGER, - - /** Aggregator-related experimental APIs. */ - AGGREGATOR, - - /** Experimental APIs for Coder binary format identifiers. */ - CODER_ENCODING_ID, - - /** State-related experimental APIs. */ - STATE, - - /** Timer-related experimental APIs. */ - TIMERS, - - /** Experimental APIs related to customizing the output time for computed values. */ - OUTPUT_TIME - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/annotations/package-info.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/annotations/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/annotations/package-info.java deleted file mode 100644 index 6c224a6..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/annotations/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -/** - * Defines annotations used across the SDK. - */ -package com.google.cloud.dataflow.sdk.annotations; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/AtomicCoder.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/AtomicCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/AtomicCoder.java deleted file mode 100644 index c4951b4..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/AtomicCoder.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.coders; - -import java.util.Collections; -import java.util.List; - -/** - * A {@link Coder} that has no component {@link Coder Coders} or other state. - * - * <p>Note that, unless the behavior is overridden, atomic coders are presumed to be deterministic - * and all instances are considered equal. - * - * @param <T> the type of the values being transcoded - */ -public abstract class AtomicCoder<T> extends DeterministicStandardCoder<T> { - protected AtomicCoder() { } - - @Override - public List<Coder<?>> getCoderArguments() { - return null; - } - - /** - * Returns a list of values contained in the provided example - * value, one per type parameter. If there are no type parameters, - * returns an empty list. - * - * <p>Because {@link AtomicCoder} has no components, always returns an empty list. - * - * @param exampleValue unused, but part of the latent interface expected by - * {@link CoderFactories#fromStaticMethods} - */ - public static <T> List<Object> getInstanceComponents(T exampleValue) { - return Collections.emptyList(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/AvroCoder.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/AvroCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/AvroCoder.java deleted file mode 100644 index 91efb43..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/AvroCoder.java +++ /dev/null @@ -1,714 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.coders; - -import static com.google.cloud.dataflow.sdk.util.Structs.addString; - -import com.google.cloud.dataflow.sdk.util.CloudObject; -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.BinaryEncoder; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.EncoderFactory; -import org.apache.avro.reflect.AvroEncode; -import org.apache.avro.reflect.AvroName; -import org.apache.avro.reflect.AvroSchema; -import org.apache.avro.reflect.ReflectData; -import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.avro.reflect.Union; -import org.apache.avro.specific.SpecificData; -import org.apache.avro.util.ClassUtils; -import org.apache.avro.util.Utf8; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.SortedSet; - -import javax.annotation.Nullable; - -/** - * A {@link Coder} using Avro binary format. - * - * <p>Each instance of {@code AvroCoder<T>} encapsulates an Avro schema for objects of type - * {@code T}. - * - * <p>The Avro schema may be provided explicitly via {@link AvroCoder#of(Class, Schema)} or - * omitted via {@link AvroCoder#of(Class)}, in which case it will be inferred - * using Avro's {@link org.apache.avro.reflect.ReflectData}. - * - * <p>For complete details about schema generation and how it can be controlled please see - * the {@link org.apache.avro.reflect} package. - * Only concrete classes with a no-argument constructor can be mapped to Avro records. - * All inherited fields that are not static or transient are included. Fields are not permitted to - * be null unless annotated by {@link Nullable} or a {@link Union} schema - * containing {@code "null"}. - * - * <p>To use, specify the {@code Coder} type on a PCollection: - * <pre> - * {@code - * PCollection<MyCustomElement> records = - * input.apply(...) - * .setCoder(AvroCoder.of(MyCustomElement.class); - * } - * </pre> - * - * <p>or annotate the element class using {@code @DefaultCoder}. - * <pre><code> - * {@literal @}DefaultCoder(AvroCoder.class) - * public class MyCustomElement { - * ... - * } - * </code></pre> - * - * <p>The implementation attempts to determine if the Avro encoding of the given type will satisfy - * the criteria of {@link Coder#verifyDeterministic} by inspecting both the type and the - * Schema provided or generated by Avro. Only coders that are deterministic can be used in - * {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey} operations. - * - * @param <T> the type of elements handled by this coder - */ -public class AvroCoder<T> extends StandardCoder<T> { - - /** - * Returns an {@code AvroCoder} instance for the provided element type. - * @param <T> the element type - */ - public static <T> AvroCoder<T> of(TypeDescriptor<T> type) { - @SuppressWarnings("unchecked") - Class<T> clazz = (Class<T>) type.getRawType(); - return of(clazz); - } - - /** - * Returns an {@code AvroCoder} instance for the provided element class. - * @param <T> the element type - */ - public static <T> AvroCoder<T> of(Class<T> clazz) { - return new AvroCoder<>(clazz, ReflectData.get().getSchema(clazz)); - } - - /** - * Returns an {@code AvroCoder} instance for the Avro schema. The implicit - * type is GenericRecord. - */ - public static AvroCoder<GenericRecord> of(Schema schema) { - return new AvroCoder<>(GenericRecord.class, schema); - } - - /** - * Returns an {@code AvroCoder} instance for the provided element type - * using the provided Avro schema. - * - * <p>If the type argument is GenericRecord, the schema may be arbitrary. - * Otherwise, the schema must correspond to the type provided. - * - * @param <T> the element type - */ - public static <T> AvroCoder<T> of(Class<T> type, Schema schema) { - return new AvroCoder<>(type, schema); - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - @JsonCreator - public static AvroCoder<?> of( - @JsonProperty("type") String classType, - @JsonProperty("schema") String schema) throws ClassNotFoundException { - Schema.Parser parser = new Schema.Parser(); - return new AvroCoder(Class.forName(classType), parser.parse(schema)); - } - - public static final CoderProvider PROVIDER = new CoderProvider() { - @Override - public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor) { - // This is a downcast from `? super T` to T. However, because - // it comes from a TypeDescriptor<T>, the class object itself - // is the same so the supertype in question shares the same - // generated AvroCoder schema. - @SuppressWarnings("unchecked") - Class<T> rawType = (Class<T>) typeDescriptor.getRawType(); - return AvroCoder.of(rawType); - } - }; - - private final Class<T> type; - private final Schema schema; - - private final List<String> nonDeterministicReasons; - - // Factories allocated by .get() are thread-safe and immutable. - private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get(); - private static final DecoderFactory DECODER_FACTORY = DecoderFactory.get(); - // Cache the old encoder/decoder and let the factories reuse them when possible. To be threadsafe, - // these are ThreadLocal. This code does not need to be re-entrant as AvroCoder does not use - // an inner coder. - private final ThreadLocal<BinaryDecoder> decoder; - private final ThreadLocal<BinaryEncoder> encoder; - private final ThreadLocal<DatumWriter<T>> writer; - private final ThreadLocal<DatumReader<T>> reader; - - protected AvroCoder(Class<T> type, Schema schema) { - this.type = type; - this.schema = schema; - - nonDeterministicReasons = new AvroDeterminismChecker().check(TypeDescriptor.of(type), schema); - - // Decoder and Encoder start off null for each thread. They are allocated and potentially - // reused inside encode/decode. - this.decoder = new ThreadLocal<>(); - this.encoder = new ThreadLocal<>(); - - // Reader and writer are allocated once per thread and are "final" for thread-local Coder - // instance. - this.reader = new ThreadLocal<DatumReader<T>>() { - @Override - public DatumReader<T> initialValue() { - return createDatumReader(); - } - }; - this.writer = new ThreadLocal<DatumWriter<T>>() { - @Override - public DatumWriter<T> initialValue() { - return createDatumWriter(); - } - }; - } - - /** - * The encoding identifier is designed to support evolution as per the design of Avro - * In order to use this class effectively, carefully read the Avro - * documentation at - * <a href="https://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution">Schema Resolution</a> - * to ensure that the old and new schema <i>match</i>. - * - * <p>In particular, this encoding identifier is guaranteed to be the same for {@code AvroCoder} - * instances of the same principal class, and otherwise distinct. The schema is not included - * in the identifier. - * - * <p>When modifying a class to be encoded as Avro, here are some guidelines; see the above link - * for greater detail. - * - * <ul> - * <li>Avoid changing field names. - * <li>Never remove a <code>required</code> field. - * <li>Only add <code>optional</code> fields, with sensible defaults. - * <li>When changing the type of a field, consult the Avro documentation to ensure the new and - * old types are interchangeable. - * </ul> - * - * <p>Code consuming this message class should be prepared to support <i>all</i> versions of - * the class until it is certain that no remaining serialized instances exist. - * - * <p>If backwards incompatible changes must be made, the best recourse is to change the name - * of your class. - */ - @Override - public String getEncodingId() { - return type.getName(); - } - - /** - * Returns the type this coder encodes/decodes. - */ - public Class<T> getType() { - return type; - } - - private Object writeReplace() { - // When serialized by Java, instances of AvroCoder should be replaced by - // a SerializedAvroCoderProxy. - return new SerializedAvroCoderProxy<>(type, schema.toString()); - } - - @Override - public void encode(T value, OutputStream outStream, Context context) throws IOException { - // Get a BinaryEncoder instance from the ThreadLocal cache and attempt to reuse it. - BinaryEncoder encoderInstance = ENCODER_FACTORY.directBinaryEncoder(outStream, encoder.get()); - // Save the potentially-new instance for reuse later. - encoder.set(encoderInstance); - writer.get().write(value, encoderInstance); - // Direct binary encoder does not buffer any data and need not be flushed. - } - - @Override - public T decode(InputStream inStream, Context context) throws IOException { - // Get a BinaryDecoder instance from the ThreadLocal cache and attempt to reuse it. - BinaryDecoder decoderInstance = DECODER_FACTORY.directBinaryDecoder(inStream, decoder.get()); - // Save the potentially-new instance for later. - decoder.set(decoderInstance); - return reader.get().read(null, decoderInstance); - } - - @Override - public List<? extends Coder<?>> getCoderArguments() { - return null; - } - - @Override - public CloudObject asCloudObject() { - CloudObject result = super.asCloudObject(); - addString(result, "type", type.getName()); - addString(result, "schema", schema.toString()); - return result; - } - - /** - * @throws NonDeterministicException when the type may not be deterministically - * encoded using the given {@link Schema}, the {@code directBinaryEncoder}, and the - * {@link ReflectDatumWriter} or {@link GenericDatumWriter}. - */ - @Override - public void verifyDeterministic() throws NonDeterministicException { - if (!nonDeterministicReasons.isEmpty()) { - throw new NonDeterministicException(this, nonDeterministicReasons); - } - } - - /** - * Returns a new {@link DatumReader} that can be used to read from an Avro file directly. Assumes - * the schema used to read is the same as the schema that was used when writing. - * - * @deprecated For {@code AvroCoder} internal use only. - */ - // TODO: once we can remove this deprecated function, inline in constructor. - @Deprecated - public DatumReader<T> createDatumReader() { - if (type.equals(GenericRecord.class)) { - return new GenericDatumReader<>(schema); - } else { - return new ReflectDatumReader<>(schema); - } - } - - /** - * Returns a new {@link DatumWriter} that can be used to write to an Avro file directly. - * - * @deprecated For {@code AvroCoder} internal use only. - */ - // TODO: once we can remove this deprecated function, inline in constructor. - @Deprecated - public DatumWriter<T> createDatumWriter() { - if (type.equals(GenericRecord.class)) { - return new GenericDatumWriter<>(schema); - } else { - return new ReflectDatumWriter<>(schema); - } - } - - /** - * Returns the schema used by this coder. - */ - public Schema getSchema() { - return schema; - } - - /** - * Proxy to use in place of serializing the {@link AvroCoder}. This allows the fields - * to remain final. - */ - private static class SerializedAvroCoderProxy<T> implements Serializable { - private final Class<T> type; - private final String schemaStr; - - public SerializedAvroCoderProxy(Class<T> type, String schemaStr) { - this.type = type; - this.schemaStr = schemaStr; - } - - private Object readResolve() { - // When deserialized, instances of this object should be replaced by - // constructing an AvroCoder. - Schema.Parser parser = new Schema.Parser(); - return new AvroCoder<T>(type, parser.parse(schemaStr)); - } - } - - /** - * Helper class encapsulating the various pieces of state maintained by the - * recursive walk used for checking if the encoding will be deterministic. - */ - private static class AvroDeterminismChecker { - - // Reasons that the original type are not deterministic. This accumulates - // the actual output. - private List<String> reasons = new ArrayList<>(); - - // Types that are currently "open". Used to make sure we don't have any - // recursive types. Note that we assume that all occurrences of a given type - // are equal, rather than tracking pairs of type + schema. - private Set<TypeDescriptor<?>> activeTypes = new HashSet<>(); - - // Similarly to how we record active types, we record the schemas we visit - // to make sure we don't encounter recursive fields. - private Set<Schema> activeSchemas = new HashSet<>(); - - /** - * Report an error in the current context. - */ - private void reportError(String context, String fmt, Object... args) { - String message = String.format(fmt, args); - reasons.add(context + ": " + message); - } - - /** - * Classes that are serialized by Avro as a String include - * <ul> - * <li>Subtypes of CharSequence (including String, Avro's mutable Utf8, etc.) - * <li>Several predefined classes (BigDecimal, BigInteger, URI, URL) - * <li>Classes annotated with @Stringable (uses their #toString() and a String constructor) - * </ul> - * - * <p>Rather than determine which of these cases are deterministic, we list some classes - * that definitely are, and treat any others as non-deterministic. - */ - private static final Set<Class<?>> DETERMINISTIC_STRINGABLE_CLASSES = new HashSet<>(); - static { - // CharSequences: - DETERMINISTIC_STRINGABLE_CLASSES.add(String.class); - DETERMINISTIC_STRINGABLE_CLASSES.add(Utf8.class); - - // Explicitly Stringable: - DETERMINISTIC_STRINGABLE_CLASSES.add(java.math.BigDecimal.class); - DETERMINISTIC_STRINGABLE_CLASSES.add(java.math.BigInteger.class); - DETERMINISTIC_STRINGABLE_CLASSES.add(java.net.URI.class); - DETERMINISTIC_STRINGABLE_CLASSES.add(java.net.URL.class); - - // Classes annotated with @Stringable: - } - - /** - * Return true if the given type token is a subtype of *any* of the listed parents. - */ - private static boolean isSubtypeOf(TypeDescriptor<?> type, Class<?>... parents) { - for (Class<?> parent : parents) { - if (type.isSubtypeOf(TypeDescriptor.of(parent))) { - return true; - } - } - return false; - } - - protected AvroDeterminismChecker() {} - - // The entry point for the check. Should not be recursively called. - public List<String> check(TypeDescriptor<?> type, Schema schema) { - recurse(type.getRawType().getName(), type, schema); - return reasons; - } - - // This is the method that should be recursively called. It sets up the path - // and visited types correctly. - private void recurse(String context, TypeDescriptor<?> type, Schema schema) { - if (type.getRawType().isAnnotationPresent(AvroSchema.class)) { - reportError(context, "Custom schemas are not supported -- remove @AvroSchema."); - return; - } - - if (!activeTypes.add(type)) { - reportError(context, "%s appears recursively", type); - return; - } - - // If the the record isn't a true class, but rather a GenericRecord, SpecificRecord, etc. - // with a specified schema, then we need to make the decision based on the generated - // implementations. - if (isSubtypeOf(type, IndexedRecord.class)) { - checkIndexedRecord(context, schema, null); - } else { - doCheck(context, type, schema); - } - - activeTypes.remove(type); - } - - private void doCheck(String context, TypeDescriptor<?> type, Schema schema) { - switch (schema.getType()) { - case ARRAY: - checkArray(context, type, schema); - break; - case ENUM: - // Enums should be deterministic, since they depend only on the ordinal. - break; - case FIXED: - // Depending on the implementation of GenericFixed, we don't know how - // the given field will be encoded. So, we assume that it isn't - // deterministic. - reportError(context, "FIXED encodings are not guaranteed to be deterministic"); - break; - case MAP: - checkMap(context, type, schema); - break; - case RECORD: - checkRecord(type, schema); - break; - case UNION: - checkUnion(context, type, schema); - break; - case STRING: - checkString(context, type); - break; - case BOOLEAN: - case BYTES: - case DOUBLE: - case INT: - case FLOAT: - case LONG: - case NULL: - // For types that Avro encodes using one of the above primitives, we assume they are - // deterministic. - break; - default: - // In any other case (eg., new types added to Avro) we cautiously return - // false. - reportError(context, "Unknown schema type %s may be non-deterministic", schema.getType()); - break; - } - } - - private void checkString(String context, TypeDescriptor<?> type) { - // For types that are encoded as strings, we need to make sure they're in an approved - // whitelist. For other types that are annotated @Stringable, Avro will just use the - // #toString() methods, which has no guarantees of determinism. - if (!DETERMINISTIC_STRINGABLE_CLASSES.contains(type.getRawType())) { - reportError(context, "%s may not have deterministic #toString()", type); - } - } - - private static final Schema AVRO_NULL_SCHEMA = Schema.create(Schema.Type.NULL); - - private void checkUnion(String context, TypeDescriptor<?> type, Schema schema) { - final List<Schema> unionTypes = schema.getTypes(); - - if (!type.getRawType().isAnnotationPresent(Union.class)) { - // First check for @Nullable field, which shows up as a union of field type and null. - if (unionTypes.size() == 2 && unionTypes.contains(AVRO_NULL_SCHEMA)) { - // Find the Schema that is not NULL and recursively check that it is deterministic. - Schema nullableFieldSchema = unionTypes.get(0).equals(AVRO_NULL_SCHEMA) - ? unionTypes.get(1) : unionTypes.get(0); - doCheck(context, type, nullableFieldSchema); - return; - } - - // Otherwise report a schema error. - reportError(context, "Expected type %s to have @Union annotation", type); - return; - } - - // Errors associated with this union will use the base class as their context. - String baseClassContext = type.getRawType().getName(); - - // For a union, we need to make sure that each possible instantiation is deterministic. - for (Schema concrete : unionTypes) { - @SuppressWarnings("unchecked") - TypeDescriptor<?> unionType = TypeDescriptor.of(ReflectData.get().getClass(concrete)); - - recurse(baseClassContext, unionType, concrete); - } - } - - private void checkRecord(TypeDescriptor<?> type, Schema schema) { - // For a record, we want to make sure that all the fields are deterministic. - Class<?> clazz = type.getRawType(); - for (org.apache.avro.Schema.Field fieldSchema : schema.getFields()) { - Field field = getField(clazz, fieldSchema.name()); - String fieldContext = field.getDeclaringClass().getName() + "#" + field.getName(); - - if (field.isAnnotationPresent(AvroEncode.class)) { - reportError(fieldContext, - "Custom encoders may be non-deterministic -- remove @AvroEncode"); - continue; - } - - if (!IndexedRecord.class.isAssignableFrom(field.getType()) - && field.isAnnotationPresent(AvroSchema.class)) { - // TODO: We should be able to support custom schemas on POJO fields, but we shouldn't - // need to, so we just allow it in the case of IndexedRecords. - reportError(fieldContext, - "Custom schemas are only supported for subtypes of IndexedRecord."); - continue; - } - - TypeDescriptor<?> fieldType = type.resolveType(field.getGenericType()); - recurse(fieldContext, fieldType, fieldSchema.schema()); - } - } - - private void checkIndexedRecord(String context, Schema schema, - @Nullable String specificClassStr) { - - if (!activeSchemas.add(schema)) { - reportError(context, "%s appears recursively", schema.getName()); - return; - } - - switch (schema.getType()) { - case ARRAY: - // Generic Records use GenericData.Array to implement arrays, which is - // essentially an ArrayList, and therefore ordering is deterministic. - // The array is thus deterministic if the elements are deterministic. - checkIndexedRecord(context, schema.getElementType(), null); - break; - case ENUM: - // Enums are deterministic because they encode as a single integer. - break; - case FIXED: - // In the case of GenericRecords, FIXED is deterministic because it - // encodes/decodes as a Byte[]. - break; - case MAP: - reportError(context, - "GenericRecord and SpecificRecords use a HashMap to represent MAPs," - + " so it is non-deterministic"); - break; - case RECORD: - for (org.apache.avro.Schema.Field field : schema.getFields()) { - checkIndexedRecord( - schema.getName() + "." + field.name(), - field.schema(), - field.getProp(SpecificData.CLASS_PROP)); - } - break; - case STRING: - // GenericDatumWriter#findStringClass will use a CharSequence or a String - // for each string, so it is deterministic. - - // SpecificCompiler#getStringType will use java.lang.String, org.apache.avro.util.Utf8, - // or java.lang.CharSequence, unless SpecificData.CLASS_PROP overrides that. - if (specificClassStr != null) { - Class<?> specificClass; - try { - specificClass = ClassUtils.forName(specificClassStr); - if (!DETERMINISTIC_STRINGABLE_CLASSES.contains(specificClass)) { - reportError(context, "Specific class %s is not known to be deterministic", - specificClassStr); - } - } catch (ClassNotFoundException e) { - reportError(context, "Specific class %s is not known to be deterministic", - specificClassStr); - } - } - break; - case UNION: - for (org.apache.avro.Schema subschema : schema.getTypes()) { - checkIndexedRecord(subschema.getName(), subschema, null); - } - break; - case BOOLEAN: - case BYTES: - case DOUBLE: - case INT: - case FLOAT: - case LONG: - case NULL: - // For types that Avro encodes using one of the above primitives, we assume they are - // deterministic. - break; - default: - reportError(context, "Unknown schema type %s may be non-deterministic", schema.getType()); - break; - } - - activeSchemas.remove(schema); - } - - private void checkMap(String context, TypeDescriptor<?> type, Schema schema) { - if (!isSubtypeOf(type, SortedMap.class)) { - reportError(context, "%s may not be deterministically ordered", type); - } - - // Avro (currently) asserts that all keys are strings. - // In case that changes, we double check that the key was a string: - Class<?> keyType = type.resolveType(Map.class.getTypeParameters()[0]).getRawType(); - if (!String.class.equals(keyType)) { - reportError(context, "map keys should be Strings, but was %s", keyType); - } - - recurse(context, - type.resolveType(Map.class.getTypeParameters()[1]), - schema.getValueType()); - } - - private void checkArray(String context, TypeDescriptor<?> type, Schema schema) { - TypeDescriptor<?> elementType = null; - if (type.isArray()) { - // The type is an array (with ordering)-> deterministic iff the element is deterministic. - elementType = type.getComponentType(); - } else if (isSubtypeOf(type, Collection.class)) { - if (isSubtypeOf(type, List.class, SortedSet.class)) { - // Ordered collection -> deterministic iff the element is deterministic - elementType = type.resolveType(Collection.class.getTypeParameters()[0]); - } else { - // Not an ordered collection -> not deterministic - reportError(context, "%s may not be deterministically ordered", type); - return; - } - } else { - // If it was an unknown type encoded as an array, be conservative and assume - // that we don't know anything about the order. - reportError(context, "encoding %s as an ARRAY was unexpected"); - return; - } - - // If we get here, it's either a deterministically-ordered Collection, or - // an array. Either way, the type is deterministic iff the element type is - // deterministic. - recurse(context, elementType, schema.getElementType()); - } - - /** - * Extract a field from a class. We need to look at the declared fields so that we can - * see private fields. We may need to walk up to the parent to get classes from the parent. - */ - private static Field getField(Class<?> clazz, String name) { - while (clazz != null) { - for (Field field : clazz.getDeclaredFields()) { - AvroName avroName = field.getAnnotation(AvroName.class); - if (avroName != null && name.equals(avroName.value())) { - return field; - } else if (avroName == null && name.equals(field.getName())) { - return field; - } - } - clazz = clazz.getSuperclass(); - } - - throw new IllegalArgumentException( - "Unable to get field " + name + " from class " + clazz); - } - } -}
