http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala ---------------------------------------------------------------------- diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala new file mode 100644 index 0000000..57bbd9b --- /dev/null +++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import java.io._ + +import org.apache.flink.util.TestLogger +import org.junit.Test +import org.junit.Assert + +class ScalaShellLocalStartupITCase extends TestLogger { + + /** + * tests flink shell with local setup through startup script in bin folder + */ + @Test + def testLocalCluster: Unit = { + val input: String = + """ + |import org.apache.flink.api.common.functions.RichMapFunction + |import org.apache.flink.api.java.io.PrintingOutputFormat + |import org.apache.flink.api.common.accumulators.IntCounter + |import org.apache.flink.configuration.Configuration + | + |val els = env.fromElements("foobar","barfoo") + |val mapped = els.map{ + | new RichMapFunction[String, String]() { + | var intCounter: IntCounter = _ + | override def open(conf: Configuration): Unit = { + | intCounter = getRuntimeContext.getIntCounter("intCounter") + | } + | + | def map(element: String): String = { + | intCounter.add(1) + | element + | } + | } + |} + |mapped.output(new PrintingOutputFormat()) + |val executionResult = env.execute("Test Job") + |System.out.println("IntCounter: " + executionResult.getIntCounterResult("intCounter")) + | + |:q + """.stripMargin + val in: BufferedReader = new BufferedReader(new StringReader(input + "\n")) + val out: StringWriter = new StringWriter + val baos: ByteArrayOutputStream = new ByteArrayOutputStream + val oldOut: PrintStream = System.out + System.setOut(new PrintStream(baos)) + val args: Array[String] = Array("local") + + //start flink scala shell + FlinkShell.bufferedReader = Some(in); + FlinkShell.main(args) + + baos.flush() + val output: String = baos.toString + System.setOut(oldOut) + + Assert.assertTrue(output.contains("IntCounter: 2")) + Assert.assertTrue(output.contains("foobar")) + Assert.assertTrue(output.contains("barfoo")) + + Assert.assertFalse(output.contains("failed")) + Assert.assertFalse(output.contains("Error")) + Assert.assertFalse(output.contains("ERROR")) + Assert.assertFalse(output.contains("Exception")) + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-scala-shell/start-script/start-scala-shell.sh ---------------------------------------------------------------------- diff --git a/flink-scala-shell/start-script/start-scala-shell.sh b/flink-scala-shell/start-script/start-scala-shell.sh new file mode 100644 index 0000000..fd85897 --- /dev/null +++ b/flink-scala-shell/start-script/start-scala-shell.sh @@ -0,0 +1,86 @@ +#!/bin/bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# from scala-lang 2.10.4 + +# restore stty settings (echo in particular) +function restoreSttySettings() { + if [[ -n $SCALA_RUNNER_DEBUG ]]; then + echo "restoring stty:" + echo "$saved_stty" + fi + stty $saved_stty + saved_stty="" +} + +function onExit() { + [[ "$saved_stty" != "" ]] && restoreSttySettings + exit $scala_exit_status +} + + +# to reenable echo if we are interrupted before completing. +trap onExit INT +# save terminal settings +saved_stty=$(stty -g 2>/dev/null) +# clear on error so we don't later try to restore them +if [[ ! $? ]]; then + saved_stty="" +fi + + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +FLINK_CLASSPATH=`constructFlinkClassPath` + +# https://issues.scala-lang.org/browse/SI-6502, cant load external jars interactively +# in scala shell since 2.10, has to be done at startup +# checks arguments for additional classpath and adds it to the "standard classpath" + +EXTERNAL_LIB_FOUND=false +for ((i=1;i<=$#;i++)) +do + if [[ ${!i} = "-a" || ${!i} = "--addclasspath" ]] + then + EXTERNAL_LIB_FOUND=true + + #adding to classpath + k=$((i+1)) + j=$((k+1)) + echo " " + echo "Additional classpath:${!k}" + echo " " + EXT_CLASSPATH="${!k}" + FLINK_CLASSPATH="$FLINK_CLASSPATH:${!k}" + set -- "${@:1:$((i-1))}" "${@:j}" + fi +done + +if ${EXTERNAL_LIB_FOUND} +then + java -Dscala.color -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell $@ --addclasspath "$EXT_CLASSPATH" +else + java -Dscala.color -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell $@ +fi + +#restore echo +onExit http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/pom.xml b/flink-staging/flink-avro/pom.xml deleted file mode 100644 index 9e0e868..0000000 --- a/flink-staging/flink-avro/pom.xml +++ /dev/null @@ -1,205 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-staging</artifactId> - <version>1.0-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-avro</artifactId> - <name>flink-avro</name> - - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${project.version}</version> - </dependency> - - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-clients</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - <!-- version is derived from base module --> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <executions> - <execution> - <id>create-test-dependency</id> - <phase>process-test-classes</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <archive> - <manifest> - <mainClass>org.apache.flink.api.avro.testjar.AvroExternalJarProgram</mainClass> - </manifest> - </archive> - <finalName>maven</finalName> - <attach>false</attach> - <descriptors> - <descriptor>src/test/assembly/test-assembly.xml</descriptor> - </descriptors> - </configuration> - </execution> - </executions> - </plugin> - <!--Remove the AvroExternalJarProgram code from the test-classes directory since it musn't be in the - classpath when running the tests to actually test whether the user code class loader - is properly used.--> - <plugin> - <artifactId>maven-clean-plugin</artifactId> - <version>2.5</version><!--$NO-MVN-MAN-VER$--> - <executions> - <execution> - <id>remove-avroexternalprogram</id> - <phase>process-test-classes</phase> - <goals> - <goal>clean</goal> - </goals> - <configuration> - <excludeDefaultDirectories>true</excludeDefaultDirectories> - <filesets> - <fileset> - <directory>${project.build.testOutputDirectory}</directory> - <includes> - <include>**/testjar/*.class</include> - </includes> - </fileset> - </filesets> - </configuration> - </execution> - </executions> - </plugin> - <!-- Generate Test class from avro schema --> - <plugin> - <groupId>org.apache.avro</groupId> - <artifactId>avro-maven-plugin</artifactId> - <version>1.7.7</version> - <executions> - <execution> - <phase>generate-sources</phase> - <goals> - <goal>schema</goal> - </goals> - <configuration> - <testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory> - <testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - - <pluginManagement> - <plugins> - <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.--> - <plugin> - <groupId>org.eclipse.m2e</groupId> - <artifactId>lifecycle-mapping</artifactId> - <version>1.0.0</version> - <configuration> - <lifecycleMappingMetadata> - <pluginExecutions> - <pluginExecution> - <pluginExecutionFilter> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-assembly-plugin</artifactId> - <versionRange>[2.4,)</versionRange> - <goals> - <goal>single</goal> - </goals> - </pluginExecutionFilter> - <action> - <ignore/> - </action> - </pluginExecution> - <pluginExecution> - <pluginExecutionFilter> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-clean-plugin</artifactId> - <versionRange>[1,)</versionRange> - <goals> - <goal>clean</goal> - </goals> - </pluginExecutionFilter> - <action> - <ignore/> - </action> - </pluginExecution> - <pluginExecution> - <pluginExecutionFilter> - <groupId>org.apache.avro</groupId> - <artifactId>avro-maven-plugin</artifactId> - <versionRange>[1.7.7,)</versionRange> - <goals> - <goal>schema</goal> - </goals> - </pluginExecutionFilter> - <action> - <ignore/> - </action> - </pluginExecution> - </pluginExecutions> - </lifecycleMappingMetadata> - </configuration> - </plugin> - </plugins> - </pluginManagement> - </build> - -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java b/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java deleted file mode 100644 index 59da4cb..0000000 --- a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.avro; - -import java.io.DataInput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.avro.io.Decoder; -import org.apache.avro.util.Utf8; - - -public class DataInputDecoder extends Decoder { - - private final Utf8 stringDecoder = new Utf8(); - - private DataInput in; - - public void setIn(DataInput in) { - this.in = in; - } - - // -------------------------------------------------------------------------------------------- - // primitives - // -------------------------------------------------------------------------------------------- - - @Override - public void readNull() {} - - - @Override - public boolean readBoolean() throws IOException { - return in.readBoolean(); - } - - @Override - public int readInt() throws IOException { - return in.readInt(); - } - - @Override - public long readLong() throws IOException { - return in.readLong(); - } - - @Override - public float readFloat() throws IOException { - return in.readFloat(); - } - - @Override - public double readDouble() throws IOException { - return in.readDouble(); - } - - @Override - public int readEnum() throws IOException { - return readInt(); - } - - // -------------------------------------------------------------------------------------------- - // bytes - // -------------------------------------------------------------------------------------------- - - @Override - public void readFixed(byte[] bytes, int start, int length) throws IOException { - in.readFully(bytes, start, length); - } - - @Override - public ByteBuffer readBytes(ByteBuffer old) throws IOException { - int length = readInt(); - ByteBuffer result; - if (old != null && length <= old.capacity() && old.hasArray()) { - result = old; - result.clear(); - } else { - result = ByteBuffer.allocate(length); - } - in.readFully(result.array(), result.arrayOffset() + result.position(), length); - result.limit(length); - return result; - } - - - @Override - public void skipFixed(int length) throws IOException { - skipBytes(length); - } - - @Override - public void skipBytes() throws IOException { - int num = readInt(); - skipBytes(num); - } - - // -------------------------------------------------------------------------------------------- - // strings - // -------------------------------------------------------------------------------------------- - - - @Override - public Utf8 readString(Utf8 old) throws IOException { - int length = readInt(); - Utf8 result = (old != null ? old : new Utf8()); - result.setByteLength(length); - - if (length > 0) { - in.readFully(result.getBytes(), 0, length); - } - - return result; - } - - @Override - public String readString() throws IOException { - return readString(stringDecoder).toString(); - } - - @Override - public void skipString() throws IOException { - int len = readInt(); - skipBytes(len); - } - - // -------------------------------------------------------------------------------------------- - // collection types - // -------------------------------------------------------------------------------------------- - - @Override - public long readArrayStart() throws IOException { - return readVarLongCount(in); - } - - @Override - public long arrayNext() throws IOException { - return readVarLongCount(in); - } - - @Override - public long skipArray() throws IOException { - return readVarLongCount(in); - } - - @Override - public long readMapStart() throws IOException { - return readVarLongCount(in); - } - - @Override - public long mapNext() throws IOException { - return readVarLongCount(in); - } - - @Override - public long skipMap() throws IOException { - return readVarLongCount(in); - } - - // -------------------------------------------------------------------------------------------- - // union - // -------------------------------------------------------------------------------------------- - - @Override - public int readIndex() throws IOException { - return readInt(); - } - - // -------------------------------------------------------------------------------------------- - // utils - // -------------------------------------------------------------------------------------------- - - private void skipBytes(int num) throws IOException { - while (num > 0) { - num -= in.skipBytes(num); - } - } - - public static long readVarLongCount(DataInput in) throws IOException { - long value = in.readUnsignedByte(); - - if ((value & 0x80) == 0) { - return value; - } - else { - long curr; - int shift = 7; - value = value & 0x7f; - while (((curr = in.readUnsignedByte()) & 0x80) != 0){ - value |= (curr & 0x7f) << shift; - shift += 7; - } - value |= curr << shift; - return value; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java b/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java deleted file mode 100644 index 0102cc1..0000000 --- a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.avro; - -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.avro.io.Encoder; -import org.apache.avro.util.Utf8; - - -public final class DataOutputEncoder extends Encoder implements java.io.Serializable { - - private static final long serialVersionUID = 1L; - - private DataOutput out; - - - public void setOut(DataOutput out) { - this.out = out; - } - - - @Override - public void flush() throws IOException {} - - // -------------------------------------------------------------------------------------------- - // primitives - // -------------------------------------------------------------------------------------------- - - @Override - public void writeNull() {} - - - @Override - public void writeBoolean(boolean b) throws IOException { - out.writeBoolean(b); - } - - @Override - public void writeInt(int n) throws IOException { - out.writeInt(n); - } - - @Override - public void writeLong(long n) throws IOException { - out.writeLong(n); - } - - @Override - public void writeFloat(float f) throws IOException { - out.writeFloat(f); - } - - @Override - public void writeDouble(double d) throws IOException { - out.writeDouble(d); - } - - @Override - public void writeEnum(int e) throws IOException { - out.writeInt(e); - } - - - // -------------------------------------------------------------------------------------------- - // bytes - // -------------------------------------------------------------------------------------------- - - @Override - public void writeFixed(byte[] bytes, int start, int len) throws IOException { - out.write(bytes, start, len); - } - - @Override - public void writeBytes(byte[] bytes, int start, int len) throws IOException { - out.writeInt(len); - if (len > 0) { - out.write(bytes, start, len); - } - } - - @Override - public void writeBytes(ByteBuffer bytes) throws IOException { - int num = bytes.remaining(); - out.writeInt(num); - - if (num > 0) { - writeFixed(bytes); - } - } - - // -------------------------------------------------------------------------------------------- - // strings - // -------------------------------------------------------------------------------------------- - - @Override - public void writeString(String str) throws IOException { - byte[] bytes = Utf8.getBytesFor(str); - writeBytes(bytes, 0, bytes.length); - } - - @Override - public void writeString(Utf8 utf8) throws IOException { - writeBytes(utf8.getBytes(), 0, utf8.getByteLength()); - - } - - // -------------------------------------------------------------------------------------------- - // collection types - // -------------------------------------------------------------------------------------------- - - @Override - public void writeArrayStart() {} - - @Override - public void setItemCount(long itemCount) throws IOException { - if (itemCount > 0) { - writeVarLongCount(out, itemCount); - } - } - - @Override - public void startItem() {} - - @Override - public void writeArrayEnd() throws IOException { - // write a single byte 0, shortcut for a var-length long of 0 - out.write(0); - } - - @Override - public void writeMapStart() {} - - @Override - public void writeMapEnd() throws IOException { - // write a single byte 0, shortcut for a var-length long of 0 - out.write(0); - } - - // -------------------------------------------------------------------------------------------- - // union - // -------------------------------------------------------------------------------------------- - - @Override - public void writeIndex(int unionIndex) throws IOException { - out.writeInt(unionIndex); - } - - // -------------------------------------------------------------------------------------------- - // utils - // -------------------------------------------------------------------------------------------- - - - public static void writeVarLongCount(DataOutput out, long val) throws IOException { - if (val < 0) { - throw new IOException("Illegal count (must be non-negative): " + val); - } - - while ((val & ~0x7FL) != 0) { - out.write(((int) val) | 0x80); - val >>>= 7; - } - out.write((int) val); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java b/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java deleted file mode 100644 index 709c4f1..0000000 --- a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.avro; - -import java.io.Closeable; -import java.io.IOException; - -import org.apache.avro.file.SeekableInput; -import org.apache.flink.core.fs.FSDataInputStream; - - -/** - * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well) - * - * The wrapper keeps track of the position in the data stream. - */ -public class FSDataInputStreamWrapper implements Closeable, SeekableInput { - private final FSDataInputStream stream; - private long pos; - private long len; - - public FSDataInputStreamWrapper(FSDataInputStream stream, long len) { - this.stream = stream; - this.pos = 0; - this.len = len; - } - - public long length() throws IOException { - return this.len; - } - - public int read(byte[] b, int off, int len) throws IOException { - int read; - read = stream.read(b, off, len); - pos += read; - return read; - } - - public void seek(long p) throws IOException { - stream.seek(p); - pos = p; - } - - public long tell() throws IOException { - return pos; - } - - public void close() throws IOException { - stream.close(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java b/flink-staging/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java deleted file mode 100644 index 6affeec..0000000 --- a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.io.avro.example; - -import java.io.IOException; -import java.util.Random; - -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.io.GenericInputFormat; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.util.Collector; - -@SuppressWarnings("serial") -public class AvroTypeExample { - - - public static void main(String[] args) throws Exception { - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<User> users = env.createInput(new UserGeneratingInputFormat()); - - users - .map(new NumberExtractingMapper()) - .groupBy(1) - .reduceGroup(new ConcatenatingReducer()) - .print(); - - env.execute(); - } - - - - public static final class NumberExtractingMapper implements MapFunction<User, Tuple2<User, Integer>> { - - @Override - public Tuple2<User, Integer> map(User user) { - return new Tuple2<User, Integer>(user, user.getFavoriteNumber()); - } - } - - - public static final class ConcatenatingReducer implements GroupReduceFunction<Tuple2<User, Integer>, Tuple2<Integer, String>> { - - @Override - public void reduce(Iterable<Tuple2<User, Integer>> values, Collector<Tuple2<Integer, String>> out) throws Exception { - int number = 0; - StringBuilder colors = new StringBuilder(); - - for (Tuple2<User, Integer> u : values) { - number = u.f1; - colors.append(u.f0.getFavoriteColor()).append(" - "); - } - - colors.setLength(colors.length() - 3); - out.collect(new Tuple2<Integer, String>(number, colors.toString())); - } - } - - - public static final class UserGeneratingInputFormat extends GenericInputFormat<User> { - - private static final long serialVersionUID = 1L; - - private static final int NUM = 100; - - private final Random rnd = new Random(32498562304986L); - - private static final String[] NAMES = { "Peter", "Bob", "Liddy", "Alexander", "Stan" }; - - private static final String[] COLORS = { "mauve", "crimson", "copper", "sky", "grass" }; - - private int count; - - - @Override - public boolean reachedEnd() throws IOException { - return count >= NUM; - } - - @Override - public User nextRecord(User reuse) throws IOException { - count++; - - User u = new User(); - u.setName(NAMES[rnd.nextInt(NAMES.length)]); - u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]); - u.setFavoriteNumber(rnd.nextInt(87)); - return u; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java b/flink-staging/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java deleted file mode 100644 index 4608f96..0000000 --- a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -/** - * Autogenerated by Avro - * - * DO NOT EDIT DIRECTLY - */ -package org.apache.flink.api.io.avro.example; -@SuppressWarnings("all") -@org.apache.avro.specific.AvroGenerated -public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.avro.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}"); - public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } - @Deprecated public java.lang.CharSequence name; - @Deprecated public java.lang.Integer favorite_number; - @Deprecated public java.lang.CharSequence favorite_color; - - /** - * Default constructor. Note that this does not initialize fields - * to their default values from the schema. If that is desired then - * one should use {@link #newBuilder()}. - */ - public User() {} - - /** - * All-args constructor. - */ - public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color) { - this.name = name; - this.favorite_number = favorite_number; - this.favorite_color = favorite_color; - } - - public org.apache.avro.Schema getSchema() { return SCHEMA$; } - // Used by DatumWriter. Applications should not call. - public java.lang.Object get(int field$) { - switch (field$) { - case 0: return name; - case 1: return favorite_number; - case 2: return favorite_color; - default: throw new org.apache.avro.AvroRuntimeException("Bad index"); - } - } - // Used by DatumReader. Applications should not call. - @SuppressWarnings(value="unchecked") - public void put(int field$, java.lang.Object value$) { - switch (field$) { - case 0: name = (java.lang.CharSequence)value$; break; - case 1: favorite_number = (java.lang.Integer)value$; break; - case 2: favorite_color = (java.lang.CharSequence)value$; break; - default: throw new org.apache.avro.AvroRuntimeException("Bad index"); - } - } - - /** - * Gets the value of the 'name' field. - */ - public java.lang.CharSequence getName() { - return name; - } - - /** - * Sets the value of the 'name' field. - * @param value the value to set. - */ - public void setName(java.lang.CharSequence value) { - this.name = value; - } - - /** - * Gets the value of the 'favorite_number' field. - */ - public java.lang.Integer getFavoriteNumber() { - return favorite_number; - } - - /** - * Sets the value of the 'favorite_number' field. - * @param value the value to set. - */ - public void setFavoriteNumber(java.lang.Integer value) { - this.favorite_number = value; - } - - /** - * Gets the value of the 'favorite_color' field. - */ - public java.lang.CharSequence getFavoriteColor() { - return favorite_color; - } - - /** - * Sets the value of the 'favorite_color' field. - * @param value the value to set. - */ - public void setFavoriteColor(java.lang.CharSequence value) { - this.favorite_color = value; - } - - /** Creates a new User RecordBuilder */ - public static org.apache.flink.api.io.avro.example.User.Builder newBuilder() { - return new org.apache.flink.api.io.avro.example.User.Builder(); - } - - /** Creates a new User RecordBuilder by copying an existing Builder */ - public static org.apache.flink.api.io.avro.example.User.Builder newBuilder(org.apache.flink.api.io.avro.example.User.Builder other) { - return new org.apache.flink.api.io.avro.example.User.Builder(other); - } - - /** Creates a new User RecordBuilder by copying an existing User instance */ - public static org.apache.flink.api.io.avro.example.User.Builder newBuilder(org.apache.flink.api.io.avro.example.User other) { - return new org.apache.flink.api.io.avro.example.User.Builder(other); - } - - /** - * RecordBuilder for User instances. - */ - public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User> - implements org.apache.avro.data.RecordBuilder<User> { - - private java.lang.CharSequence name; - private java.lang.Integer favorite_number; - private java.lang.CharSequence favorite_color; - - /** Creates a new Builder */ - private Builder() { - super(org.apache.flink.api.io.avro.example.User.SCHEMA$); - } - - /** Creates a Builder by copying an existing Builder */ - private Builder(org.apache.flink.api.io.avro.example.User.Builder other) { - super(other); - if (isValidValue(fields()[0], other.name)) { - this.name = data().deepCopy(fields()[0].schema(), other.name); - fieldSetFlags()[0] = true; - } - if (isValidValue(fields()[1], other.favorite_number)) { - this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number); - fieldSetFlags()[1] = true; - } - if (isValidValue(fields()[2], other.favorite_color)) { - this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color); - fieldSetFlags()[2] = true; - } - } - - /** Creates a Builder by copying an existing User instance */ - private Builder(org.apache.flink.api.io.avro.example.User other) { - super(org.apache.flink.api.io.avro.example.User.SCHEMA$); - if (isValidValue(fields()[0], other.name)) { - this.name = data().deepCopy(fields()[0].schema(), other.name); - fieldSetFlags()[0] = true; - } - if (isValidValue(fields()[1], other.favorite_number)) { - this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number); - fieldSetFlags()[1] = true; - } - if (isValidValue(fields()[2], other.favorite_color)) { - this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color); - fieldSetFlags()[2] = true; - } - } - - /** Gets the value of the 'name' field */ - public java.lang.CharSequence getName() { - return name; - } - - /** Sets the value of the 'name' field */ - public org.apache.flink.api.io.avro.example.User.Builder setName(java.lang.CharSequence value) { - validate(fields()[0], value); - this.name = value; - fieldSetFlags()[0] = true; - return this; - } - - /** Checks whether the 'name' field has been set */ - public boolean hasName() { - return fieldSetFlags()[0]; - } - - /** Clears the value of the 'name' field */ - public org.apache.flink.api.io.avro.example.User.Builder clearName() { - name = null; - fieldSetFlags()[0] = false; - return this; - } - - /** Gets the value of the 'favorite_number' field */ - public java.lang.Integer getFavoriteNumber() { - return favorite_number; - } - - /** Sets the value of the 'favorite_number' field */ - public org.apache.flink.api.io.avro.example.User.Builder setFavoriteNumber(java.lang.Integer value) { - validate(fields()[1], value); - this.favorite_number = value; - fieldSetFlags()[1] = true; - return this; - } - - /** Checks whether the 'favorite_number' field has been set */ - public boolean hasFavoriteNumber() { - return fieldSetFlags()[1]; - } - - /** Clears the value of the 'favorite_number' field */ - public org.apache.flink.api.io.avro.example.User.Builder clearFavoriteNumber() { - favorite_number = null; - fieldSetFlags()[1] = false; - return this; - } - - /** Gets the value of the 'favorite_color' field */ - public java.lang.CharSequence getFavoriteColor() { - return favorite_color; - } - - /** Sets the value of the 'favorite_color' field */ - public org.apache.flink.api.io.avro.example.User.Builder setFavoriteColor(java.lang.CharSequence value) { - validate(fields()[2], value); - this.favorite_color = value; - fieldSetFlags()[2] = true; - return this; - } - - /** Checks whether the 'favorite_color' field has been set */ - public boolean hasFavoriteColor() { - return fieldSetFlags()[2]; - } - - /** Clears the value of the 'favorite_color' field */ - public org.apache.flink.api.io.avro.example.User.Builder clearFavoriteColor() { - favorite_color = null; - fieldSetFlags()[2] = false; - return this; - } - - @Override - public User build() { - try { - User record = new User(); - record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]); - record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]); - record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]); - return record; - } catch (Exception e) { - throw new org.apache.avro.AvroRuntimeException(e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-staging/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java deleted file mode 100644 index 09fcacb..0000000 --- a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.io; - -import java.io.IOException; - -import org.apache.avro.file.DataFileReader; -import org.apache.avro.file.FileReader; -import org.apache.avro.file.SeekableInput; -import org.apache.avro.io.DatumReader; -import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.avro.specific.SpecificDatumReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.api.avro.FSDataInputStreamWrapper; -import org.apache.flink.api.common.io.FileInputFormat; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; -import org.apache.flink.util.InstantiationUtil; - - -public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E> { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class); - - private final Class<E> avroValueType; - - private boolean reuseAvroValue = true; - - private transient FileReader<E> dataFileReader; - - private transient long end; - - - public AvroInputFormat(Path filePath, Class<E> type) { - super(filePath); - this.avroValueType = type; - } - - - /** - * Sets the flag whether to reuse the Avro value instance for all records. - * By default, the input format reuses the Avro value. - * - * @param reuseAvroValue True, if the input format should reuse the Avro value instance, false otherwise. - */ - public void setReuseAvroValue(boolean reuseAvroValue) { - this.reuseAvroValue = reuseAvroValue; - } - - /** - * If set, the InputFormat will only read entire files. - */ - public void setUnsplittable(boolean unsplittable) { - this.unsplittable = unsplittable; - } - - // -------------------------------------------------------------------------------------------- - // Typing - // -------------------------------------------------------------------------------------------- - - @Override - public TypeInformation<E> getProducedType() { - return TypeExtractor.getForClass(this.avroValueType); - } - - // -------------------------------------------------------------------------------------------- - // Input Format Methods - // -------------------------------------------------------------------------------------------- - - @Override - public void open(FileInputSplit split) throws IOException { - super.open(split); - - DatumReader<E> datumReader; - if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) { - datumReader = new SpecificDatumReader<E>(avroValueType); - } else { - datumReader = new ReflectDatumReader<E>(avroValueType); - } - - LOG.info("Opening split " + split); - - SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen()); - - dataFileReader = DataFileReader.openReader(in, datumReader); - dataFileReader.sync(split.getStart()); - this.end = split.getStart() + split.getLength(); - } - - @Override - public boolean reachedEnd() throws IOException { - return !dataFileReader.hasNext() || dataFileReader.pastSync(end); - } - - @Override - public E nextRecord(E reuseValue) throws IOException { - if (reachedEnd()) { - return null; - } - - if (!reuseAvroValue) { - reuseValue = InstantiationUtil.instantiate(avroValueType, Object.class); - } - - reuseValue = dataFileReader.next(reuseValue); - return reuseValue; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-staging/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java deleted file mode 100644 index d00dbf7..0000000 --- a/flink-staging/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.io; - - -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.reflect.ReflectData; -import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.flink.api.common.io.FileOutputFormat; -import org.apache.flink.core.fs.Path; - -import java.io.IOException; - -public class AvroOutputFormat<E> extends FileOutputFormat<E> { - - private static final long serialVersionUID = 1L; - - private final Class<E> avroValueType; - - private Schema userDefinedSchema = null; - - private transient DataFileWriter<E> dataFileWriter; - - public AvroOutputFormat(Path filePath, Class<E> type) { - super(filePath); - this.avroValueType = type; - } - - public AvroOutputFormat(Class<E> type) { - this.avroValueType = type; - } - - @Override - protected String getDirectoryFileName(int taskNumber) { - return super.getDirectoryFileName(taskNumber) + ".avro"; - } - - public void setSchema(Schema schema) { - this.userDefinedSchema = schema; - } - - @Override - public void writeRecord(E record) throws IOException { - dataFileWriter.append(record); - } - - @Override - public void open(int taskNumber, int numTasks) throws IOException { - super.open(taskNumber, numTasks); - - DatumWriter<E> datumWriter; - Schema schema = null; - if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) { - datumWriter = new SpecificDatumWriter<E>(avroValueType); - try { - schema = ((org.apache.avro.specific.SpecificRecordBase)avroValueType.newInstance()).getSchema(); - } catch (InstantiationException e) { - throw new RuntimeException(e.getMessage()); - } catch (IllegalAccessException e) { - throw new RuntimeException(e.getMessage()); - } - } else { - datumWriter = new ReflectDatumWriter<E>(avroValueType); - schema = ReflectData.get().getSchema(avroValueType); - } - dataFileWriter = new DataFileWriter<E>(datumWriter); - if (userDefinedSchema == null) { - dataFileWriter.create(schema, stream); - } else { - dataFileWriter.create(userDefinedSchema, stream); - } - } - - @Override - public void close() throws IOException { - dataFileWriter.flush(); - dataFileWriter.close(); - super.close(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/assembly/test-assembly.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/assembly/test-assembly.xml b/flink-staging/flink-avro/src/test/assembly/test-assembly.xml deleted file mode 100644 index 0f4561a..0000000 --- a/flink-staging/flink-avro/src/test/assembly/test-assembly.xml +++ /dev/null @@ -1,36 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -<assembly> - <id>test-jar</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <fileSets> - <fileSet> - <directory>${project.build.testOutputDirectory}</directory> - <outputDirectory>/</outputDirectory> - <!--modify/add include to match your package(s) --> - <includes> - <include>org/apache/flink/api/avro/testjar/**</include> - </includes> - </fileSet> - </fileSets> -</assembly> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java deleted file mode 100644 index e2d91af..0000000 --- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.avro; - -import java.io.File; -import java.net.InetAddress; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.client.CliFrontend; -import org.apache.flink.client.RemoteExecutor; -import org.apache.flink.client.program.Client; -import org.apache.flink.client.program.JobWithJars; -import org.apache.flink.client.program.PackagedProgram; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.optimizer.Optimizer; -import org.apache.flink.optimizer.plan.FlinkPlan; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; - -import org.junit.Assert; -import org.junit.Test; - - -public class AvroExternalJarProgramITCase { - - private static final String JAR_FILE = "target/maven-test-jar.jar"; - - private static final String TEST_DATA_FILE = "/testdata.avro"; - - @Test - public void testExternalProgram() { - - ForkableFlinkMiniCluster testMiniCluster = null; - - try { - Configuration config = new Configuration(); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); - testMiniCluster = new ForkableFlinkMiniCluster(config, false); - testMiniCluster.start(); - - String jarFile = JAR_FILE; - String testData = getClass().getResource(TEST_DATA_FILE).toString(); - - PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData }); - - - config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); - config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort()); - - Client client = new Client(config); - - client.setPrintStatusDuringExecution(false); - client.runBlocking(program, 4); - - } - catch (Throwable t) { - System.err.println(t.getMessage()); - t.printStackTrace(); - Assert.fail("Error during the packaged program execution: " + t.getMessage()); - } - finally { - if (testMiniCluster != null) { - try { - testMiniCluster.stop(); - } catch (Throwable t) { - // ignore - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java deleted file mode 100644 index d40fec5..0000000 --- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.avro; - -import org.junit.Assert; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; - -import org.apache.avro.file.DataFileReader; -import org.apache.avro.io.DatumReader; -import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.flink.api.io.avro.example.User; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.io.AvroOutputFormat; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.test.util.JavaProgramTestBase; - -@SuppressWarnings("serial") -public class AvroOutputFormatITCase extends JavaProgramTestBase { - - public static String outputPath1; - - public static String outputPath2; - - public static String inputPath; - - public static String userData = "alice|1|blue\n" + - "bob|2|red\n" + - "john|3|yellow\n" + - "walt|4|black\n"; - - @Override - protected void preSubmit() throws Exception { - inputPath = createTempFile("user", userData); - outputPath1 = getTempDirPath("avro_output1"); - outputPath2 = getTempDirPath("avro_output2"); - } - - - @Override - protected void testProgram() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath) - .fieldDelimiter("|") - .types(String.class, Integer.class, String.class); - - //output the data with AvroOutputFormat for specific user type - DataSet<User> specificUser = input.map(new ConvertToUser()); - specificUser.write(new AvroOutputFormat<User>(User.class), outputPath1); - - //output the data with AvroOutputFormat for reflect user type - DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective()); - reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2); - - env.execute(); - } - - @Override - protected void postSubmit() throws Exception { - //compare result for specific user type - File [] output1; - File file1 = asFile(outputPath1); - if (file1.isDirectory()) { - output1 = file1.listFiles(); - // check for avro ext in dir. - for (File avroOutput : output1) { - Assert.assertTrue("Expect extension '.avro'", avroOutput.toString().endsWith(".avro")); - } - } else { - output1 = new File[] {file1}; - } - List<String> result1 = new ArrayList<String>(); - DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class); - for (File avroOutput : output1) { - - DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1); - while (dataFileReader1.hasNext()) { - User user = dataFileReader1.next(); - result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor()); - } - } - for (String expectedResult : userData.split("\n")) { - Assert.assertTrue("expected user " + expectedResult + " not found.", result1.contains(expectedResult)); - } - - //compare result for reflect user type - File [] output2; - File file2 = asFile(outputPath2); - if (file2.isDirectory()) { - output2 = file2.listFiles(); - } else { - output2 = new File[] {file2}; - } - List<String> result2 = new ArrayList<String>(); - DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class); - for (File avroOutput : output2) { - DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2); - while (dataFileReader2.hasNext()) { - ReflectiveUser user = dataFileReader2.next(); - result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor()); - } - } - for (String expectedResult : userData.split("\n")) { - Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult)); - } - - - } - - - public final static class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> { - - @Override - public User map(Tuple3<String, Integer, String> value) throws Exception { - return new User(value.f0, value.f1, value.f2); - } - } - - public final static class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> { - - @Override - public ReflectiveUser map(User value) throws Exception { - return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString()); - } - } - - - public static class ReflectiveUser { - private String name; - private int favoriteNumber; - private String favoriteColor; - - public ReflectiveUser() {} - - public ReflectiveUser(String name, int favoriteNumber, String favoriteColor) { - this.name = name; - this.favoriteNumber = favoriteNumber; - this.favoriteColor = favoriteColor; - } - - public String getName() { - return this.name; - } - public String getFavoriteColor() { - return this.favoriteColor; - } - public int getFavoriteNumber() { - return this.favoriteNumber; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java deleted file mode 100644 index c39db15..0000000 --- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java +++ /dev/null @@ -1,528 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.avro; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; - -import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.flink.api.io.avro.generated.Address; -import org.apache.flink.api.io.avro.generated.Colors; -import org.apache.flink.api.io.avro.generated.Fixed16; -import org.apache.flink.api.io.avro.generated.User; -import org.apache.flink.util.StringUtils; -import org.junit.Test; - -import static org.junit.Assert.*; - -/** - * Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes for Avro serialization. - */ -public class EncoderDecoderTest { - @Test - public void testComplexStringsDirecty() { - try { - Random rnd = new Random(349712539451944123L); - - for (int i = 0; i < 10; i++) { - String testString = StringUtils.getRandomString(rnd, 10, 100); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - { - DataOutputStream dataOut = new DataOutputStream(baos); - DataOutputEncoder encoder = new DataOutputEncoder(); - encoder.setOut(dataOut); - - encoder.writeString(testString); - dataOut.flush(); - dataOut.close(); - } - - byte[] data = baos.toByteArray(); - - // deserialize - { - ByteArrayInputStream bais = new ByteArrayInputStream(data); - DataInputStream dataIn = new DataInputStream(bais); - DataInputDecoder decoder = new DataInputDecoder(); - decoder.setIn(dataIn); - - String deserialized = decoder.readString(); - - assertEquals(testString, deserialized); - } - } - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Test failed due to an exception: " + e.getMessage()); - } - } - - @Test - public void testPrimitiveTypes() { - - testObjectSerialization(new Boolean(true)); - testObjectSerialization(new Boolean(false)); - - testObjectSerialization(Byte.valueOf((byte) 0)); - testObjectSerialization(Byte.valueOf((byte) 1)); - testObjectSerialization(Byte.valueOf((byte) -1)); - testObjectSerialization(Byte.valueOf(Byte.MIN_VALUE)); - testObjectSerialization(Byte.valueOf(Byte.MAX_VALUE)); - - testObjectSerialization(Short.valueOf((short) 0)); - testObjectSerialization(Short.valueOf((short) 1)); - testObjectSerialization(Short.valueOf((short) -1)); - testObjectSerialization(Short.valueOf(Short.MIN_VALUE)); - testObjectSerialization(Short.valueOf(Short.MAX_VALUE)); - - testObjectSerialization(Integer.valueOf(0)); - testObjectSerialization(Integer.valueOf(1)); - testObjectSerialization(Integer.valueOf(-1)); - testObjectSerialization(Integer.valueOf(Integer.MIN_VALUE)); - testObjectSerialization(Integer.valueOf(Integer.MAX_VALUE)); - - testObjectSerialization(Long.valueOf(0)); - testObjectSerialization(Long.valueOf(1)); - testObjectSerialization(Long.valueOf(-1)); - testObjectSerialization(Long.valueOf(Long.MIN_VALUE)); - testObjectSerialization(Long.valueOf(Long.MAX_VALUE)); - - testObjectSerialization(Float.valueOf(0)); - testObjectSerialization(Float.valueOf(1)); - testObjectSerialization(Float.valueOf(-1)); - testObjectSerialization(Float.valueOf((float)Math.E)); - testObjectSerialization(Float.valueOf((float)Math.PI)); - testObjectSerialization(Float.valueOf(Float.MIN_VALUE)); - testObjectSerialization(Float.valueOf(Float.MAX_VALUE)); - testObjectSerialization(Float.valueOf(Float.MIN_NORMAL)); - testObjectSerialization(Float.valueOf(Float.NaN)); - testObjectSerialization(Float.valueOf(Float.NEGATIVE_INFINITY)); - testObjectSerialization(Float.valueOf(Float.POSITIVE_INFINITY)); - - testObjectSerialization(Double.valueOf(0)); - testObjectSerialization(Double.valueOf(1)); - testObjectSerialization(Double.valueOf(-1)); - testObjectSerialization(Double.valueOf(Math.E)); - testObjectSerialization(Double.valueOf(Math.PI)); - testObjectSerialization(Double.valueOf(Double.MIN_VALUE)); - testObjectSerialization(Double.valueOf(Double.MAX_VALUE)); - testObjectSerialization(Double.valueOf(Double.MIN_NORMAL)); - testObjectSerialization(Double.valueOf(Double.NaN)); - testObjectSerialization(Double.valueOf(Double.NEGATIVE_INFINITY)); - testObjectSerialization(Double.valueOf(Double.POSITIVE_INFINITY)); - - testObjectSerialization(""); - testObjectSerialization("abcdefg"); - testObjectSerialization("ab\u1535\u0155xyz\u706F"); - - testObjectSerialization(new SimpleTypes(3637, 54876486548L, (byte) 65, "We're out looking for astronauts", (short) 0x2387, 2.65767523)); - testObjectSerialization(new SimpleTypes(705608724, -1L, (byte) -65, "Serve me the sky with a big slice of lemon", (short) Byte.MIN_VALUE, 0.0000001)); - } - - @Test - public void testArrayTypes() { - { - int[] array = new int[] {1, 2, 3, 4, 5}; - testObjectSerialization(array); - } - { - long[] array = new long[] {1, 2, 3, 4, 5}; - testObjectSerialization(array); - } - { - float[] array = new float[] {1, 2, 3, 4, 5}; - testObjectSerialization(array); - } - { - double[] array = new double[] {1, 2, 3, 4, 5}; - testObjectSerialization(array); - } - { - String[] array = new String[] {"Oh", "my", "what", "do", "we", "have", "here", "?"}; - testObjectSerialization(array); - } - } - - @Test - public void testEmptyArray() { - { - int[] array = new int[0]; - testObjectSerialization(array); - } - { - long[] array = new long[0]; - testObjectSerialization(array); - } - { - float[] array = new float[0]; - testObjectSerialization(array); - } - { - double[] array = new double[0]; - testObjectSerialization(array); - } - { - String[] array = new String[0]; - testObjectSerialization(array); - } - } - - @Test - public void testObjects() { - // simple object containing only primitives - { - testObjectSerialization(new Book(976243875L, "The Serialization Odysse", 42)); - } - - // object with collection - { - ArrayList<String> list = new ArrayList<String>(); - list.add("A"); - list.add("B"); - list.add("C"); - list.add("D"); - list.add("E"); - - testObjectSerialization(new BookAuthor(976243875L, list, "Arno Nym")); - } - - // object with empty collection - { - ArrayList<String> list = new ArrayList<String>(); - testObjectSerialization(new BookAuthor(987654321L, list, "The Saurus")); - } - } - - @Test - public void testNestedObjectsWithCollections() { - testObjectSerialization(new ComplexNestedObject2(true)); - } - - @Test - public void testGeneratedObjectWithNullableFields() { - List<CharSequence> strings = Arrays.asList(new CharSequence[] { "These", "strings", "should", "be", "recognizable", "as", "a", "meaningful", "sequence" }); - List<Boolean> bools = Arrays.asList(true, true, false, false, true, false, true, true); - Map<CharSequence, Long> map = new HashMap<CharSequence, Long>(); - map.put("1", 1L); - map.put("2", 2L); - map.put("3", 3L); - - byte[] b = new byte[16]; - new Random().nextBytes(b); - Fixed16 f = new Fixed16(b); - Address addr = new Address(new Integer(239), "6th Main", "Bangalore", - "Karnataka", "560075"); - User user = new User("Freudenreich", 1337, "macintosh gray", - 1234567890L, 3.1415926, null, true, strings, bools, null, - Colors.GREEN, map, f, new Boolean(true), addr); - - testObjectSerialization(user); - } - - @Test - public void testVarLenCountEncoding() { - try { - long[] values = new long[] { 0, 1, 2, 3, 4, 0, 574, 45236, 0, 234623462, 23462462346L, 0, 9734028767869761L, 0x7fffffffffffffffL}; - - // write - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - { - DataOutputStream dataOut = new DataOutputStream(baos); - - for (long val : values) { - DataOutputEncoder.writeVarLongCount(dataOut, val); - } - - dataOut.flush(); - dataOut.close(); - } - - // read - { - ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); - DataInputStream dataIn = new DataInputStream(bais); - - for (long val : values) { - long read = DataInputDecoder.readVarLongCount(dataIn); - assertEquals("Wrong var-len encoded value read.", val, read); - } - } - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Test failed due to an exception: " + e.getMessage()); - } - } - - private static <X> void testObjectSerialization(X obj) { - - try { - - // serialize - ByteArrayOutputStream baos = new ByteArrayOutputStream(512); - { - DataOutputStream dataOut = new DataOutputStream(baos); - DataOutputEncoder encoder = new DataOutputEncoder(); - encoder.setOut(dataOut); - - @SuppressWarnings("unchecked") - Class<X> clazz = (Class<X>) obj.getClass(); - ReflectDatumWriter<X> writer = new ReflectDatumWriter<X>(clazz); - - writer.write(obj, encoder); - dataOut.flush(); - dataOut.close(); - } - - byte[] data = baos.toByteArray(); - X result = null; - - // deserialize - { - ByteArrayInputStream bais = new ByteArrayInputStream(data); - DataInputStream dataIn = new DataInputStream(bais); - DataInputDecoder decoder = new DataInputDecoder(); - decoder.setIn(dataIn); - - @SuppressWarnings("unchecked") - Class<X> clazz = (Class<X>) obj.getClass(); - ReflectDatumReader<X> reader = new ReflectDatumReader<X>(clazz); - - // create a reuse object if possible, otherwise we have no reuse object - X reuse = null; - try { - @SuppressWarnings("unchecked") - X test = (X) obj.getClass().newInstance(); - reuse = test; - } catch (Throwable t) {} - - result = reader.read(reuse, decoder); - } - - // check - final String message = "Deserialized object is not the same as the original"; - - if (obj.getClass().isArray()) { - Class<?> clazz = obj.getClass(); - if (clazz == byte[].class) { - assertArrayEquals(message, (byte[]) obj, (byte[]) result); - } - else if (clazz == short[].class) { - assertArrayEquals(message, (short[]) obj, (short[]) result); - } - else if (clazz == int[].class) { - assertArrayEquals(message, (int[]) obj, (int[]) result); - } - else if (clazz == long[].class) { - assertArrayEquals(message, (long[]) obj, (long[]) result); - } - else if (clazz == char[].class) { - assertArrayEquals(message, (char[]) obj, (char[]) result); - } - else if (clazz == float[].class) { - assertArrayEquals(message, (float[]) obj, (float[]) result, 0.0f); - } - else if (clazz == double[].class) { - assertArrayEquals(message, (double[]) obj, (double[]) result, 0.0); - } else { - assertArrayEquals(message, (Object[]) obj, (Object[]) result); - } - } else { - assertEquals(message, obj, result); - } - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Test failed due to an exception: " + e.getMessage()); - } - } - - // -------------------------------------------------------------------------------------------- - // Test Objects - // -------------------------------------------------------------------------------------------- - - - public static final class SimpleTypes { - - private final int iVal; - private final long lVal; - private final byte bVal; - private final String sVal; - private final short rVal; - private final double dVal; - - - public SimpleTypes() { - this(0, 0, (byte) 0, "", (short) 0, 0); - } - - public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) { - this.iVal = iVal; - this.lVal = lVal; - this.bVal = bVal; - this.sVal = sVal; - this.rVal = rVal; - this.dVal = dVal; - } - - @Override - public boolean equals(Object obj) { - if (obj.getClass() == SimpleTypes.class) { - SimpleTypes other = (SimpleTypes) obj; - - return other.iVal == this.iVal && - other.lVal == this.lVal && - other.bVal == this.bVal && - other.sVal.equals(this.sVal) && - other.rVal == this.rVal && - other.dVal == this.dVal; - - } else { - return false; - } - } - } - - public static class ComplexNestedObject1 { - - private double doubleValue; - - private List<String> stringList; - - public ComplexNestedObject1() {} - - public ComplexNestedObject1(int offInit) { - this.doubleValue = 6293485.6723 + offInit; - - this.stringList = new ArrayList<String>(); - this.stringList.add("A" + offInit); - this.stringList.add("somewhat" + offInit); - this.stringList.add("random" + offInit); - this.stringList.add("collection" + offInit); - this.stringList.add("of" + offInit); - this.stringList.add("strings" + offInit); - } - - @Override - public boolean equals(Object obj) { - if (obj.getClass() == ComplexNestedObject1.class) { - ComplexNestedObject1 other = (ComplexNestedObject1) obj; - return other.doubleValue == this.doubleValue && this.stringList.equals(other.stringList); - } else { - return false; - } - } - } - - public static class ComplexNestedObject2 { - - private long longValue; - - private Map<String, ComplexNestedObject1> theMap; - - public ComplexNestedObject2() {} - - public ComplexNestedObject2(boolean init) { - this.longValue = 46547; - - this.theMap = new HashMap<String, ComplexNestedObject1>(); - this.theMap.put("36354L", new ComplexNestedObject1(43546543)); - this.theMap.put("785611L", new ComplexNestedObject1(45784568)); - this.theMap.put("43L", new ComplexNestedObject1(9876543)); - this.theMap.put("-45687L", new ComplexNestedObject1(7897615)); - this.theMap.put("1919876876896L", new ComplexNestedObject1(27154)); - this.theMap.put("-868468468L", new ComplexNestedObject1(546435)); - } - - @Override - public boolean equals(Object obj) { - if (obj.getClass() == ComplexNestedObject2.class) { - ComplexNestedObject2 other = (ComplexNestedObject2) obj; - return other.longValue == this.longValue && this.theMap.equals(other.theMap); - } else { - return false; - } - } - } - - public static class Book { - - private long bookId; - private String title; - private long authorId; - - public Book() {} - - public Book(long bookId, String title, long authorId) { - this.bookId = bookId; - this.title = title; - this.authorId = authorId; - } - - @Override - public boolean equals(Object obj) { - if (obj.getClass() == Book.class) { - Book other = (Book) obj; - return other.bookId == this.bookId && other.authorId == this.authorId && this.title.equals(other.title); - } else { - return false; - } - } - } - - public static class BookAuthor { - - private long authorId; - private List<String> bookTitles; - private String authorName; - - public BookAuthor() {} - - public BookAuthor(long authorId, List<String> bookTitles, String authorName) { - this.authorId = authorId; - this.bookTitles = bookTitles; - this.authorName = authorName; - } - - @Override - public boolean equals(Object obj) { - if (obj.getClass() == BookAuthor.class) { - BookAuthor other = (BookAuthor) obj; - return other.authorName.equals(this.authorName) && other.authorId == this.authorId && - other.bookTitles.equals(this.bookTitles); - } else { - return false; - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java deleted file mode 100644 index 1174786..0000000 --- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.avro.testjar; - -// ================================================================================================ -// This file defines the classes for the AvroExternalJarProgramITCase. -// The program is exported into src/test/resources/AvroTestProgram.jar. -// -// THIS FILE MUST STAY FULLY COMMENTED SUCH THAT THE HERE DEFINED CLASSES ARE NOT COMPILED -// AND ADDED TO THE test-classes DIRECTORY. OTHERWISE, THE EXTERNAL CLASS LOADING WILL -// NOT BE COVERED BY THIS TEST. -// ================================================================================================ - - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.reflect.ReflectData; -import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.functions.RichReduceFunction; -import org.apache.flink.api.java.io.DiscardingOutputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.AvroInputFormat; -import org.apache.flink.core.fs.Path; - -public class AvroExternalJarProgram { - - public static final class Color { - - private String name; - private double saturation; - - public Color() { - name = ""; - saturation = 1.0; - } - - public Color(String name, double saturation) { - this.name = name; - this.saturation = saturation; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public double getSaturation() { - return saturation; - } - - public void setSaturation(double saturation) { - this.saturation = saturation; - } - - @Override - public String toString() { - return name + '(' + saturation + ')'; - } - } - - public static final class MyUser { - - private String name; - private List<Color> colors; - - public MyUser() { - name = "unknown"; - colors = new ArrayList<Color>(); - } - - public MyUser(String name, List<Color> colors) { - this.name = name; - this.colors = colors; - } - - public String getName() { - return name; - } - - public List<Color> getColors() { - return colors; - } - - public void setName(String name) { - this.name = name; - } - - public void setColors(List<Color> colors) { - this.colors = colors; - } - - @Override - public String toString() { - return name + " : " + colors; - } - } - - // -------------------------------------------------------------------------------------------- - - // -------------------------------------------------------------------------------------------- - - public static final class NameExtractor extends RichMapFunction<MyUser, Tuple2<String, MyUser>> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<String, MyUser> map(MyUser u) { - String namePrefix = u.getName().substring(0, 1); - return new Tuple2<String, MyUser>(namePrefix, u); - } - } - - public static final class NameGrouper extends RichReduceFunction<Tuple2<String, MyUser>> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<String, MyUser> reduce(Tuple2<String, MyUser> val1, Tuple2<String, MyUser> val2) { - return val1; - } - } - - // -------------------------------------------------------------------------------------------- - // Test Data - // -------------------------------------------------------------------------------------------- - - public static final class Generator { - - private final Random rnd = new Random(2389756789345689276L); - - public MyUser nextUser() { - return randomUser(); - } - - private MyUser randomUser() { - - int numColors = rnd.nextInt(5); - ArrayList<Color> colors = new ArrayList<Color>(numColors); - for (int i = 0; i < numColors; i++) { - colors.add(new Color(randomString(), rnd.nextDouble())); - } - - return new MyUser(randomString(), colors); - } - - private String randomString() { - char[] c = new char[this.rnd.nextInt(20) + 5]; - - for (int i = 0; i < c.length; i++) { - c[i] = (char) (this.rnd.nextInt(150) + 40); - } - - return new String(c); - } - } - - public static void writeTestData(File testFile, int numRecords) throws IOException { - - DatumWriter<MyUser> userDatumWriter = new ReflectDatumWriter<MyUser>(MyUser.class); - DataFileWriter<MyUser> dataFileWriter = new DataFileWriter<MyUser>(userDatumWriter); - - dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile); - - - Generator generator = new Generator(); - - for (int i = 0; i < numRecords; i++) { - MyUser user = generator.nextUser(); - dataFileWriter.append(user); - } - - dataFileWriter.close(); - } - -// public static void main(String[] args) throws Exception { -// String testDataFile = new File("src/test/resources/testdata.avro").getAbsolutePath(); -// writeTestData(new File(testDataFile), 50); -// } - - public static void main(String[] args) throws Exception { - String inputPath = args[0]; - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<MyUser> input = env.createInput(new AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class)); - - DataSet<Tuple2<String, MyUser>> result = input.map(new NameExtractor()).groupBy(0).reduce(new NameGrouper()); - - result.output(new DiscardingOutputFormat<Tuple2<String,MyUser>>()); - env.execute(); - } -}