Repository: flink Updated Branches: refs/heads/release-1.1 e296acae5 -> 6662cc643
[FLINK-5701] [kafka] FlinkKafkaProducer should check asyncException on checkpoints This closes #3549. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6662cc64 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6662cc64 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6662cc64 Branch: refs/heads/release-1.1 Commit: 6662cc64332a7c08efd7672d3abea3176529d774 Parents: e296aca Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Thu Mar 16 00:05:51 2017 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Thu Mar 16 12:41:20 2017 +0800 ---------------------------------------------------------------------- .../kafka/FlinkKafkaProducerBase.java | 18 +- .../kafka/AtLeastOnceProducerTest.java | 422 +++++++++++++------ .../testutils/FakeStandardProducerConfig.java | 34 ++ 3 files changed, 339 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6662cc64/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java index e63f033..ea9caeb 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java @@ -107,7 +107,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im /** * If true, the producer will wait until all outstanding records have been send to the broker. */ - private boolean flushOnCheckpoint; + protected boolean flushOnCheckpoint; // -------------------------------- Runtime fields ------------------------------------------ @@ -330,7 +330,10 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im protected abstract void flush(); @Override - public Serializable snapshotState(long checkpointId, long checkpointTimestamp) { + public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + // check for asynchronous errors and fail the checkpoint if necessary + checkErroneous(); + if (flushOnCheckpoint) { // flushing is activated: We need to wait until pendingRecords is 0 flush(); @@ -338,7 +341,9 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im if (pendingRecords != 0) { throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecords); } - // pending records count is 0. We can now confirm the checkpoint + + // if the flushed requests has errors, we should propagate it also and fail the checkpoint + checkErroneous(); } } // return empty state @@ -374,4 +379,11 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); return props; } + + // this is exposed for testing purposes + protected long numPendingRecords() { + synchronized (pendingRecordsLock) { + return pendingRecords; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/6662cc64/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java index b02593c..3dabceb 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java @@ -18,32 +18,36 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.serialization.ByteArraySerializer; import org.junit.Assert; import org.junit.Test; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Properties; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Test ensuring that the producer is not dropping buffered records @@ -51,168 +55,322 @@ import java.util.concurrent.atomic.AtomicBoolean; @SuppressWarnings("unchecked") public class AtLeastOnceProducerTest { - // we set a timeout because the test will not finish if the logic is broken - @Test(timeout=5000) - public void testAtLeastOnceProducer() throws Throwable { - runTest(true); - } + /** + * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown + */ + @Test + public void testAsyncErrorRethrownOnInvoke() throws Throwable { + final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>( + FakeStandardProducerConfig.get(), null); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer)); + + testHarness.open(); - // This test ensures that the actual test fails if the flushing is disabled - @Test(expected = AssertionError.class, timeout=5000) - public void ensureTestFails() throws Throwable { - runTest(false); + testHarness.processElement(new StreamRecord<>("msg-1")); + + // let the message request return an async exception + producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception")); + + try { + testHarness.processElement(new StreamRecord<>("msg-2")); + } catch (Exception e) { + // the next invoke should rethrow the async exception + Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception")); + + // test succeeded + return; + } + + Assert.fail(); } - private void runTest(boolean flushOnCheckpoint) throws Throwable { - Properties props = new Properties(); - final AtomicBoolean snapshottingFinished = new AtomicBoolean(false); - final TestingKafkaProducer<String> producer = new TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, - snapshottingFinished); - producer.setFlushOnCheckpoint(flushOnCheckpoint); - producer.setRuntimeContext(new MockRuntimeContext(0, 1)); + /** + * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown + */ + @Test + public void testAsyncErrorRethrownOnCheckpoint() throws Throwable { + final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>( + FakeStandardProducerConfig.get(), null); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer)); - producer.open(new Configuration()); + testHarness.open(); - for (int i = 0; i < 100; i++) { - producer.invoke("msg-" + i); + testHarness.processElement(new StreamRecord<>("msg-1")); + + // let the message request return an async exception + producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception")); + + try { + testHarness.snapshot(123L, 123L); + } catch (Exception e) { + // the next invoke should rethrow the async exception + Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception")); + + // test succeeded + return; } - // start a thread confirming all pending records - final Tuple1<Throwable> runnableError = new Tuple1<>(null); - final Thread threadA = Thread.currentThread(); - Runnable confirmer = new Runnable() { + Assert.fail(); + } + + /** + * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint, + * it should be rethrown; we set a timeout because the test will not finish if the logic is broken. + * + * Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds. + * The test for that is covered in testAtLeastOnceProducer. + */ + @SuppressWarnings("unchecked") + @Test(timeout=5000) + public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable { + final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>( + FakeStandardProducerConfig.get(), null); + producer.setFlushOnCheckpoint(true); + + final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer(); + + final OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer)); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>("msg-1")); + testHarness.processElement(new StreamRecord<>("msg-2")); + testHarness.processElement(new StreamRecord<>("msg-3")); + + verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class)); + + // only let the first callback succeed for now + producer.getPendingCallbacks().get(0).onCompletion(null, null); + + final AtomicReference<Exception> exceptionRef = new AtomicReference<>(); + Thread snapshotThread = new Thread(new Runnable() { @Override public void run() { try { - MockProducer mp = producer.getProducerInstance(); - List<Callback> pending = mp.getPending(); - - // we need to find out if the snapshot() method blocks forever - // this is not possible. If snapshot() is running, it will - // start removing elements from the pending list. - synchronized (threadA) { - threadA.wait(500L); - } - // we now check that no records have been confirmed yet - Assert.assertEquals(100, pending.size()); - Assert.assertFalse("Snapshot method returned before all records were confirmed", - snapshottingFinished.get()); - - // now confirm all checkpoints - for (Callback c: pending) { - c.onCompletion(null, null); - } - pending.clear(); - } catch(Throwable t) { - runnableError.f0 = t; + testHarness.snapshot(123L, 123L); + } catch (Exception e) { + exceptionRef.compareAndSet(null, e); } } - }; - Thread threadB = new Thread(confirmer); - threadB.start(); - // this should block: - producer.snapshotState(0, 0); - synchronized (threadA) { - threadA.notifyAll(); // just in case, to let the test fail faster - } - Assert.assertEquals(0, producer.getProducerInstance().getPending().size()); - Deadline deadline = FiniteDuration.apply(5, "s").fromNow(); - while (deadline.hasTimeLeft() && threadB.isAlive()) { - threadB.join(500); - } - Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", threadB.isAlive()); - if (runnableError.f0 != null) { - throw runnableError.f0; - } + }); + snapshotThread.start(); + + // let the 2nd message fail with an async exception + producer.getPendingCallbacks().get(1).onCompletion(null, new Exception("artificial async failure for 2nd message")); + producer.getPendingCallbacks().get(2).onCompletion(null, null); + + snapshotThread.join(); - producer.close(); + // the snapshot should have failed with the async exception + Exception snapshotError = exceptionRef.get(); + assertTrue(snapshotError != null); + assertTrue(snapshotError.getCause().getMessage().contains("artificial async failure for 2nd message")); } + /** + * Test ensuring that the producer is not dropping buffered records; + * we set a timeout because the test will not finish if the logic is broken + */ + @SuppressWarnings("unchecked") + @Test(timeout=10000) + public void testAtLeastOnceProducer() throws Throwable { + final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>( + FakeStandardProducerConfig.get(), null); - private static class TestingKafkaProducer<T> extends FlinkKafkaProducerBase<T> { - private MockProducer prod; - private AtomicBoolean snapshottingFinished; + // enable flushing + producer.setFlushOnCheckpoint(true); - public TestingKafkaProducer(String defaultTopicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, AtomicBoolean snapshottingFinished) { - super(defaultTopicId, serializationSchema, producerConfig, null); - this.snapshottingFinished = snapshottingFinished; - } + final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer(); - @Override - protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) { - this.prod = new MockProducer(); - return this.prod; - } + final OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer)); - @Override - public Serializable snapshotState(long checkpointId, long checkpointTimestamp) { - // call the actual snapshot state - Serializable ret = super.snapshotState(checkpointId, checkpointTimestamp); - // notify test that snapshotting has been done - snapshottingFinished.set(true); - return ret; - } + testHarness.open(); - @Override - protected void flush() { - this.prod.flush(); - } + testHarness.processElement(new StreamRecord<>("msg-1")); + testHarness.processElement(new StreamRecord<>("msg-2")); + testHarness.processElement(new StreamRecord<>("msg-3")); - public MockProducer getProducerInstance() { - return this.prod; - } + verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class)); + Assert.assertEquals(3, producer.getPendingSize()); + + // start a thread to perform checkpointing + final AtomicReference<Exception> exceptionRef = new AtomicReference<>(); + Thread snapshotThread = new Thread(new Runnable() { + @Override + public void run() { + try { + // this should block until all records are flushed; + // if the snapshot implementation returns before pending records are flushed, + testHarness.snapshot(123L, 123L); + } catch (Exception e) { + exceptionRef.compareAndSet(null, e); + } + } + }); + snapshotThread.start(); + + // before proceeding, make sure that flushing has started and that the snapshot is still blocked; + // this would block forever if the snapshot didn't perform a flush + producer.waitUntilFlushStarted(); + Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive()); + + // now, complete the callbacks + producer.getPendingCallbacks().get(0).onCompletion(null, null); + Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive()); + Assert.assertEquals(2, producer.getPendingSize()); + + producer.getPendingCallbacks().get(1).onCompletion(null, null); + Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive()); + Assert.assertEquals(1, producer.getPendingSize()); + + producer.getPendingCallbacks().get(2).onCompletion(null, null); + Assert.assertEquals(0, producer.getPendingSize()); + + snapshotThread.join(); + + // snapshot would fail with an exception if flushing wasn't completed before the snapshot method returned; + // make sure this did not happen + assertTrue(exceptionRef.get() == null); + + testHarness.close(); } - private static class MockProducer<K, V> extends KafkaProducer<K, V> { - List<Callback> pendingCallbacks = new ArrayList<>(); + /** + * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled, + * the snapshot method does indeed finishes without waiting for pending records; + * we set a timeout because the test will not finish if the logic is broken + */ + @SuppressWarnings("unchecked") + @Test(timeout=5000) + public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable { + final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>( + FakeStandardProducerConfig.get(), null); + producer.setFlushOnCheckpoint(false); + + final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer(); + + final OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer)); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>("msg")); + + // make sure that all callbacks have not been completed + verify(mockProducer, times(1)).send(any(ProducerRecord.class), any(Callback.class)); + + // should return even if there are pending records + testHarness.snapshot(123L, 123L); + + testHarness.close(); + } + + // ------------------------------------------------------------------------ + + private static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> { + private static final long serialVersionUID = 1L; + + private final static String DUMMY_TOPIC = "dummy-topic"; + + private transient KafkaProducer<?, ?> mockProducer; + private transient List<Callback> pendingCallbacks; + private transient MultiShotLatch flushLatch; + private boolean isFlushed; - private static Properties getFakeProperties() { - Properties p = new Properties(); - p.setProperty("bootstrap.servers", "localhost:12345"); - p.setProperty("key.serializer", ByteArraySerializer.class.getName()); - p.setProperty("value.serializer", ByteArraySerializer.class.getName()); - return p; + @SuppressWarnings("unchecked") + DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner) { + + super(DUMMY_TOPIC, (KeyedSerializationSchema<T>) mock(KeyedSerializationSchema.class), producerConfig, partitioner); + + this.pendingCallbacks = new ArrayList<>(); + this.flushLatch = new MultiShotLatch(); } - public MockProducer() { - super(getFakeProperties()); + + long getPendingSize() { + if (flushOnCheckpoint) { + return numPendingRecords(); + } else { + // when flushing is disabled, the implementation does not + // maintain the current number of pending records to reduce + // the extra locking overhead required to do so + throw new UnsupportedOperationException("getPendingSize not supported when flushing is disabled"); + } } - @Override - public Future<RecordMetadata> send(ProducerRecord<K, V> record) { - throw new UnsupportedOperationException("Unexpected"); + List<Callback> getPendingCallbacks() { + return pendingCallbacks; } - @Override - public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { - pendingCallbacks.add(callback); - return null; + KafkaProducer<?, ?> getMockKafkaProducer() { + return mockProducer; } @Override - public List<PartitionInfo> partitionsFor(String topic) { - List<PartitionInfo> list = new ArrayList<>(); - list.add(new PartitionInfo(topic, 0, null, null, null)); - return list; + public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + isFlushed = false; + + Serializable snapshot = super.snapshotState(checkpointId, checkpointTimestamp); + + // if the snapshot implementation doesn't wait until all pending records are flushed, we should fail the test + if (flushOnCheckpoint && !isFlushed) { + throw new RuntimeException("Flushing is enabled; snapshots should be blocked until all pending records are flushed"); + } + + return snapshot; } - @Override - public Map<MetricName, ? extends Metric> metrics() { - return null; + public void waitUntilFlushStarted() throws Exception { + flushLatch.await(); } + @SuppressWarnings("unchecked") + @Override + protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) { + if (this.mockProducer == null) { + this.mockProducer = mock(KafkaProducer.class); + when(mockProducer.send(any(ProducerRecord.class), any(Callback.class))).thenAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + pendingCallbacks.add((Callback) invocationOnMock.getArguments()[1]); + return null; + } + }); + } - public List<Callback> getPending() { - return this.pendingCallbacks; + return (KafkaProducer<K, V>) this.mockProducer; } - public void flush() { - while (pendingCallbacks.size() > 0) { + @Override + protected void flush() { + flushLatch.trigger(); + + // simply wait until the producer's pending records become zero. + // This relies on the fact that the producer's Callback implementation + // and pending records tracking logic is implemented correctly, otherwise + // we will loop forever. + while (numPendingRecords() > 0) { try { Thread.sleep(10); } catch (InterruptedException e) { throw new RuntimeException("Unable to flush producer, task was interrupted"); } } + + isFlushed = true; + } + + @Override + public RuntimeContext getRuntimeContext() { + StreamingRuntimeContext runtimeContext = mock(StreamingRuntimeContext.class); + when(runtimeContext.isCheckpointingEnabled()).thenReturn(true); + when(runtimeContext.getMetricGroup()).thenReturn(mock(MetricGroup.class)); + return runtimeContext; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/6662cc64/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java new file mode 100644 index 0000000..055326d --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.kafka.common.serialization.ByteArraySerializer; + +import java.util.Properties; + +public class FakeStandardProducerConfig { + + public static Properties get() { + Properties p = new Properties(); + p.setProperty("bootstrap.servers", "localhost:12345"); + p.setProperty("key.serializer", ByteArraySerializer.class.getName()); + p.setProperty("value.serializer", ByteArraySerializer.class.getName()); + return p; + } + +}
