RYA-377 Implemented a Kafka version of GetQueryResultStream.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/fc9775e2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/fc9775e2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/fc9775e2 Branch: refs/heads/master Commit: fc9775e2639b8efa263c4f54f32f233da7e0b397 Parents: 6056528 Author: kchilton2 <kevin.e.chil...@gmail.com> Authored: Tue Oct 31 18:24:36 2017 -0400 Committer: caleb <caleb.me...@parsons.com> Committed: Tue Jan 9 15:13:00 2018 -0500 ---------------------------------------------------------------------- .../streams/api/entity/QueryResultStream.java | 24 ++- .../apache/rya/streams/kafka/KafkaTopics.java | 21 +- .../kafka/entity/KafkaQueryResultStream.java | 99 ++++++++++ .../interactor/KafkaGetQueryResultStream.java | 108 ++++++++++ .../interactor/KafkaGetQueryResultStreamIT.java | 195 +++++++++++++++++++ .../rya/test/kafka/KafkaTestInstanceRule.java | 14 ++ 6 files changed, 455 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fc9775e2/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java index fdd62df..aa5dcfd 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/QueryResultStream.java @@ -18,7 +18,8 @@ */ package org.apache.rya.streams.api.entity; -import java.util.Collection; +import static java.util.Objects.requireNonNull; + import java.util.UUID; import org.apache.rya.api.model.VisibilityBindingSet; @@ -31,12 +32,26 @@ import edu.umd.cs.findbugs.annotations.NonNull; * An infinite stream of {@link VisibilityBindingSet}s that are the results of a query within Rya Streams. */ @DefaultAnnotation(NonNull.class) -public interface QueryResultStream extends AutoCloseable { +public abstract class QueryResultStream implements AutoCloseable { + + private final UUID queryId; + + /** + * Constructs an instance of {@link QueryResultStream}. + * + * @param queryId - The query whose results this stream iterates over. (not null) + */ + public QueryResultStream(final UUID queryId) { + this.queryId = requireNonNull(queryId); + } + /** * @return Identifies which query in Rya Streams this result stream is over. */ - public UUID getQueryId(); + public UUID getQueryId() { + return queryId; + } /** * Wait at most {@code timeoutMs} milliseconds for the next collection of results. @@ -44,7 +59,8 @@ public interface QueryResultStream extends AutoCloseable { * @param timeoutMs - The number of milliseconds to at most wait for the next collection of results. (not null) * @return The next collection of {@link VisibilityBindingSet}s that are the result of the query. Empty if * there where no new results within the timout period. + * @throws IllegalStateException If the stream has been closed. * @throws RyaStreamsException Could not fetch the next set of results. */ - public Collection<VisibilityBindingSet> poll(long timeoutMs) throws RyaStreamsException; + public abstract Iterable<VisibilityBindingSet> poll(long timeoutMs) throws IllegalStateException, RyaStreamsException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fc9775e2/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java index dfc4c9d..a8fbf23 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java @@ -18,6 +18,10 @@ */ package org.apache.rya.streams.kafka; +import static java.util.Objects.requireNonNull; + +import java.util.UUID; + import org.apache.rya.streams.api.queries.QueryChangeLog; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; @@ -30,23 +34,36 @@ import edu.umd.cs.findbugs.annotations.NonNull; public class KafkaTopics { /** - * Creates the Kafka topic that will be used for a specific instance of Rya's {@link QueryChangeLog}. + * Creates the Kafka topic name that is used for a specific instance of Rya's {@link QueryChangeLog}. * * @param ryaInstance - The Rya instance the change log is for. (not null) * @return The name of the Kafka topic. */ public static String queryChangeLogTopic(final String ryaInstance) { + requireNonNull(ryaInstance); return ryaInstance + "-QueryChangeLog"; } /** - * Creates the Kafka topic that will be used to load statements into the Rya Streams system for a specific + * Creates the Kafka topic name that is used to load statements into the Rya Streams system for a specific * instance of Rya. * * @param ryaInstance - The Rya instance the statements are for. (not null) * @return The name of the Kafka topic. */ public static String statementsTopic(final String ryaInstance) { + requireNonNull(ryaInstance); return ryaInstance + "-Statements"; } + + /** + * Creates the Kafka topic name that is used for a specific query that is managed within Rya Streams. + * + * @param queryId - The id of the query the topic is for. + * @return The name of the Kafka topic. + */ + public static String queryResultsTopic(final UUID queryId) { + requireNonNull(queryId); + return "QueryResults-" + queryId.toString(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fc9775e2/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java new file mode 100644 index 0000000..360aaa2 --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/entity/KafkaQueryResultStream.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.entity; + +import static java.util.Objects.requireNonNull; + +import java.util.Iterator; +import java.util.UUID; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.streams.api.entity.QueryResultStream; +import org.apache.rya.streams.api.exception.RyaStreamsException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka implementation of {@link QueryResultStream}. It delegates the {@link #poll(long)} method to + * a {@link Consumer}. As a result, the starting point of this stream is whatever position the consumer + * starts at within the Kafka topic. + */ +@DefaultAnnotation(NonNull.class) +public class KafkaQueryResultStream extends QueryResultStream { + + private final Consumer<?, VisibilityBindingSet> consumer; + + /** + * Constructs an instance of {@link KafkaQueryResultStream}. + * + * @param queryId - The query the results are for. (not null) + * @param consumer - The consumer that will be polled by this class. (not null) + */ + public KafkaQueryResultStream(final UUID queryId, final Consumer<?, VisibilityBindingSet> consumer) { + super(queryId); + this.consumer = requireNonNull(consumer); + } + + @Override + public Iterable<VisibilityBindingSet> poll(final long timeoutMs) throws RyaStreamsException { + return new RecordEntryIterable<>( consumer.poll(timeoutMs) ); + } + + /** + * An {@link Iterable} that creates {@link Iterator}s over a {@link ConsumerRecords}' values. + * This is useful for when you don't care about the key portion of a record. + * + * @param <K> - The type of the record's key. + * @param <T> - The type of the record's value. + */ + private final class RecordEntryIterable<K, T> implements Iterable<T> { + + private final ConsumerRecords<K, T> records; + + public RecordEntryIterable(final ConsumerRecords<K, T> records) { + this.records = requireNonNull(records); + } + + @Override + public Iterator<T> iterator() { + return new Iterator<T>() { + private final Iterator<ConsumerRecord<K, T>> it = records.iterator(); + + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public T next() { + return it.next().value(); + } + }; + } + } + + @Override + public void close() throws Exception { + consumer.close(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fc9775e2/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java new file mode 100644 index 0000000..b3c3fea --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStream.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.interactor; + +import static java.util.Objects.requireNonNull; + +import java.util.Arrays; +import java.util.Properties; +import java.util.UUID; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.streams.api.entity.QueryResultStream; +import org.apache.rya.streams.api.exception.RyaStreamsException; +import org.apache.rya.streams.api.interactor.GetQueryResultStream; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.entity.KafkaQueryResultStream; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka topic implementation of {@link GetQueryResultStream}. + */ +@DefaultAnnotation(NonNull.class) +public class KafkaGetQueryResultStream implements GetQueryResultStream { + + private final String bootstrapServers; + + /** + * Constructs an instance of {@link KafkaGetQueryResultStream}. + * + * @param kafkaHostname - The hostname of the Kafka Broker to connect to. (not null) + * @param kafkaPort - The port of the Kafka Broker to connect to. (not null) + */ + public KafkaGetQueryResultStream(final String kafkaHostname, final String kafkaPort) { + requireNonNull(kafkaHostname); + requireNonNull(kafkaPort); + bootstrapServers = kafkaHostname + ":" + kafkaPort; + } + + @Override + public QueryResultStream fromStart(final UUID queryId) throws RyaStreamsException { + requireNonNull(queryId); + + // Always start at the earliest point within the topic. + return makeStream(queryId, "earliest"); + } + + @Override + public QueryResultStream fromNow(final UUID queryId) throws RyaStreamsException { + requireNonNull(queryId); + + // Always start at the latest point within the topic. + return makeStream(queryId, "latest"); + } + + private QueryResultStream makeStream(final UUID queryId, final String autoOffsetResetConfig) { + // Configure which instance of Kafka to connect to. + final Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + + // Nothing meaningful is in the key and the value is a VisibilityBindingSet. + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityBindingSetDeserializer.class); + + // Use a UUID for the Group Id so that we never register as part of the same group as another consumer. + props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + + // Set a client id so that server side logging can be traced. + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "Query-Result-Stream-" + queryId); + + // These consumers always start at a specific point and move forwards until the caller is finished with + // the returned stream, so never commit the consumer's progress. + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + // We are not closing the consumer here because the returned QueryResultStream is responsible for closing the + // underlying resources required to process it. + final KafkaConsumer<Object, VisibilityBindingSet> consumer = new KafkaConsumer<>(props); + + // Register the consumer for the query's results. + final String resultTopic = KafkaTopics.queryResultsTopic(queryId); + consumer.subscribe(Arrays.asList(resultTopic)); + + // Return the result stream. + return new KafkaQueryResultStream(queryId, consumer); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fc9775e2/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java new file mode 100644 index 0000000..3343f76 --- /dev/null +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.interactor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.streams.api.entity.QueryResultStream; +import org.apache.rya.streams.api.interactor.GetQueryResultStream; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.impl.MapBindingSet; + +/** + * Integration tests the methods of {@link KafkaGetQueryResultStream}. + */ +public class KafkaGetQueryResultStreamIT { + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); + + /** + * @return A {@link Producer} that is able to write {@link VisibilityBindingSet}s. + */ + private Producer<?, VisibilityBindingSet> makeProducer() { + final Properties producerProps = kafka.createBootstrapServerConfig(); + producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VisibilityBindingSetSerializer.class.getName()); + return new KafkaProducer<>(producerProps); + } + + /** + * Polls a {@link QueryResultStream} until it has either polled too many times without hitting + * the target number of results, or it hits the target number of results. + * + * @param pollMs - How long each poll could take. + * @param pollIterations - The maximum nubmer of polls that will be attempted. + * @param targetSize - The number of results to read before stopping. + * @param stream - The stream that will be polled. + * @return The results that were read from the stream. + * @throws Exception If the poll failed. + */ + private List<VisibilityBindingSet> pollForResults( + final int pollMs, + final int pollIterations, + final int targetSize, + final QueryResultStream stream) throws Exception{ + final List<VisibilityBindingSet> read = new ArrayList<>(); + + int i = 0; + while(read.size() < targetSize && i < pollIterations) { + for(final VisibilityBindingSet visBs : stream.poll(pollMs)) { + read.add( visBs ); + } + i++; + } + + return read; + } + + @Test + public void fromStart() throws Exception { + // Create an ID for the query. + final UUID queryId = UUID.randomUUID(); + + // Create a list of test VisibilityBindingSets. + final List<VisibilityBindingSet> original = new ArrayList<>(); + + final ValueFactory vf = new ValueFactoryImpl(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("urn:name", vf.createLiteral("Alice")); + original.add(new VisibilityBindingSet(bs, "a|b|c")); + + bs = new MapBindingSet(); + bs.addBinding("urn:name", vf.createLiteral("Bob")); + original.add(new VisibilityBindingSet(bs, "a")); + + bs = new MapBindingSet(); + bs.addBinding("urn:name", vf.createLiteral("Charlie")); + original.add(new VisibilityBindingSet(bs, "b|c")); + + // Write some entries to the query result topic in Kafka. + try(final Producer<?, VisibilityBindingSet> producer = makeProducer()) { + final String resultTopic = KafkaTopics.queryResultsTopic(queryId); + for(final VisibilityBindingSet visBs : original) { + producer.send(new ProducerRecord<>(resultTopic, visBs)); + } + } + + // Use the interactor that is being tested to read all of the visibility binding sets. + final GetQueryResultStream interactor = new KafkaGetQueryResultStream(kafka.getKafkaHostname(), kafka.getKafkaPort()); + final List<VisibilityBindingSet> read = pollForResults(500, 3, 3, interactor.fromStart(queryId)); + + // Show the fetched binding sets match the original, as well as their order. + assertEquals(original, read); + } + + @Test + public void fromNow() throws Exception { + // Create an ID for the query. + final UUID queryId = UUID.randomUUID(); + + try(final Producer<?, VisibilityBindingSet> producer = makeProducer()) { + final String resultTopic = KafkaTopics.queryResultsTopic(queryId); + + // Write a single visibility binding set to the query's result topic. This will not appear in the expected results. + final ValueFactory vf = new ValueFactoryImpl(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("urn:name", vf.createLiteral("Alice")); + producer.send(new ProducerRecord<>(resultTopic, new VisibilityBindingSet(bs, "a|b|c"))); + producer.flush(); + + // Use the interactor that is being tested to read all of the visibility binding sets that appear after this point. + final GetQueryResultStream interactor = new KafkaGetQueryResultStream(kafka.getKafkaHostname(), kafka.getKafkaPort()); + try(QueryResultStream results = interactor.fromNow(queryId)) { + // Read results from the stream. + List<VisibilityBindingSet> read = new ArrayList<>(); + for(final VisibilityBindingSet visBs : results.poll(500)) { + read.add(visBs); + } + + // Show nothing has been read. + assertTrue(read.isEmpty()); + + // Write two more entries to the result topic. These will be seen by the result stream. + final List<VisibilityBindingSet> original = new ArrayList<>(); + + bs = new MapBindingSet(); + bs.addBinding("urn:name", vf.createLiteral("Bob")); + original.add(new VisibilityBindingSet(bs, "a")); + + bs = new MapBindingSet(); + bs.addBinding("urn:name", vf.createLiteral("Charlie")); + original.add(new VisibilityBindingSet(bs, "b|c")); + + for(final VisibilityBindingSet visBs : original) { + producer.send(new ProducerRecord<>(resultTopic, visBs)); + } + producer.flush(); + + // Read the results from the result stream. + read = pollForResults(500, 3, 2, results); + + // Show the new entries were read. + assertEquals(original, read); + } + } + } + + @Test(expected = IllegalStateException.class) + public void pollClosedStream() throws Exception { + // Create an ID for the query. + final UUID queryId = UUID.randomUUID(); + + // Use the interactor that is being tested to create a result stream and immediately close it. + final GetQueryResultStream interactor = new KafkaGetQueryResultStream(kafka.getKafkaHostname(), kafka.getKafkaPort()); + final QueryResultStream results = interactor.fromStart(queryId); + results.close(); + + // Try to poll the closed stream. + results.poll(1); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fc9775e2/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java ---------------------------------------------------------------------- diff --git a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java index 41a0a67..f76fa2b 100644 --- a/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java +++ b/test/kafka/src/main/java/org/apache/rya/test/kafka/KafkaTestInstanceRule.java @@ -103,4 +103,18 @@ public class KafkaTestInstanceRule extends ExternalResource { public Properties createBootstrapServerConfig() { return kafkaInstance.createBootstrapServerConfig(); } + + /** + * @return The hostname of the Kafka Broker. + */ + public String getKafkaHostname() { + return kafkaInstance.getBrokerHost(); + } + + /** + * @return The port of the Kafka Broker. + */ + public String getKafkaPort() { + return kafkaInstance.getBrokerPort(); + } } \ No newline at end of file