[
https://issues.apache.org/jira/browse/AVRO-2214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635734#comment-16635734
]
ASF GitHub Bot commented on AVRO-2214:
--------------------------------------
thiru-apache closed pull request #328: AVRO-2214 Support sync and seek in C++
DataFileReader
URL: https://github.com/apache/avro/pull/328
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/lang/c++/api/DataFile.hh b/lang/c++/api/DataFile.hh
index bff309770..4236d3537 100644
--- a/lang/c++/api/DataFile.hh
+++ b/lang/c++/api/DataFile.hh
@@ -172,12 +172,14 @@ public:
*/
class AVRO_DECL DataFileReaderBase : boost::noncopyable {
const std::string filename_;
- const std::auto_ptr<InputStream> stream_;
+ const std::auto_ptr<SeekableInputStream> stream_;
const DecoderPtr decoder_;
int64_t objectCount_;
bool eof_;
Codec codec_;
-
+ int64_t blockStart_;
+ int64_t blockEnd_;
+
ValidSchema readerSchema_;
ValidSchema dataSchema_;
DecoderPtr dataDecoder_;
@@ -247,6 +249,29 @@ public:
* Closes the reader. No further operation is possible on this reader.
*/
void close();
+
+ /**
+ * Move to a specific, known synchronization point, for example one
returned
+ * from tell() after sync().
+ */
+ void seek(int64_t position);
+
+ /**
+ * Move to the next synchronization point after a position. To process a
+ * range of file entires, call this with the starting position, then check
+ * pastSync() with the end point before each use of decoder().
+ */
+ void sync(int64_t position);
+
+ /**
+ * Return true if past the next synchronization point after a position.
+ */
+ bool pastSync(int64_t position);
+
+ /**
+ * Return the last synchronization point before our current position.
+ */
+ int64_t previousSync();
};
/**
@@ -330,6 +355,29 @@ public:
* Closes the reader. No further operation is possible on this reader.
*/
void close() { return base_->close(); }
+
+ /**
+ * Move to a specific, known synchronization point, for example one
returned
+ * from previousSync().
+ */
+ void seek(int64_t position) { base_->seek(position); }
+
+ /**
+ * Move to the next synchronization point after a position. To process a
+ * range of file entires, call this with the starting position, then check
+ * pastSync() with the end point before each call to read().
+ */
+ void sync(int64_t position) { base_->sync(position); }
+
+ /**
+ * Return true if past the next synchronization point after a position.
+ */
+ bool pastSync(int64_t position) { return base_->pastSync(position); }
+
+ /**
+ * Return the last synchronization point before our current position.
+ */
+ int64_t previousSync() { return base_->previousSync(); }
};
} // namespace avro
diff --git a/lang/c++/api/Stream.hh b/lang/c++/api/Stream.hh
index 92b2334d2..42ccf0a00 100644
--- a/lang/c++/api/Stream.hh
+++ b/lang/c++/api/Stream.hh
@@ -75,6 +75,31 @@ public:
virtual size_t byteCount() const = 0;
};
+/**
+ * An InputStream which also supports seeking to a specific offset.
+ */
+class AVRO_DECL SeekableInputStream : public InputStream {
+protected:
+
+ /**
+ * An empty constuctor.
+ */
+ SeekableInputStream() { }
+
+public:
+ /**
+ * Destructor.
+ */
+ virtual ~SeekableInputStream() { }
+
+ /**
+ * Seek to a specific position in the stream. This may invalidate pointers
+ * returned from next(). This will also reset byteCount() to the given
+ * position.
+ */
+ virtual void seek(int64_t position) = 0;
+};
+
/**
* A no-copy output stream.
*/
@@ -161,8 +186,10 @@ AVRO_DECL std::auto_ptr<OutputStream>
fileOutputStream(const char* filename,
* Returns a new InputStream whose contents come from the given file.
* Data is read in chunks of given buffer size.
*/
-AVRO_DECL std::auto_ptr<InputStream> fileInputStream(const char* filename,
- size_t bufferSize = 8 * 1024);
+AVRO_DECL std::auto_ptr<InputStream> fileInputStream(
+ const char *filename, size_t bufferSize = 8 * 1024);
+AVRO_DECL std::auto_ptr<SeekableInputStream> fileSeekableInputStream(
+ const char *filename, size_t bufferSize = 8 * 1024);
/**
* Returns a new OutputStream whose contents will be sent to the given
@@ -177,8 +204,8 @@ AVRO_DECL std::auto_ptr<OutputStream>
ostreamOutputStream(std::ostream& os,
* std::istream. The std::istream object should outlive the returned
* InputStream.
*/
-AVRO_DECL std::auto_ptr<InputStream> istreamInputStream(std::istream& in,
- size_t bufferSize = 8 * 1024);
+AVRO_DECL std::auto_ptr<InputStream> istreamInputStream(
+ std::istream &in, size_t bufferSize = 8 * 1024);
/** A convenience class for reading from an InputStream */
struct StreamReader {
diff --git a/lang/c++/impl/DataFile.cc b/lang/c++/impl/DataFile.cc
index ee8f62c6a..181689da0 100644
--- a/lang/c++/impl/DataFile.cc
+++ b/lang/c++/impl/DataFile.cc
@@ -249,8 +249,9 @@ void DataFileWriterBase::setMetadata(const string& key,
const string& value)
}
DataFileReaderBase::DataFileReaderBase(const char* filename) :
- filename_(filename), stream_(fileInputStream(filename)),
- decoder_(binaryDecoder()), objectCount_(0), eof_(false)
+ filename_(filename), stream_(fileSeekableInputStream(filename)),
+ decoder_(binaryDecoder()), objectCount_(0), eof_(false), blockStart_(-1),
+ blockEnd_(-1)
{
readHeader();
}
@@ -295,7 +296,7 @@ std::ostream& operator << (std::ostream& os, const
DataFileSync& s)
bool DataFileReaderBase::hasMore()
{
- if (eof_) {
+ if (eof_) {
return false;
} else if (objectCount_ != 0) {
return true;
@@ -358,6 +359,7 @@ auto_ptr<InputStream> boundedInputStream(InputStream& in,
size_t limit)
bool DataFileReaderBase::readDataBlock()
{
decoder_->init(*stream_);
+ blockStart_ = stream_->byteCount();
const uint8_t* p = 0;
size_t n = 0;
if (! stream_->next(&p, &n)) {
@@ -369,7 +371,8 @@ bool DataFileReaderBase::readDataBlock()
int64_t byteCount;
avro::decode(*decoder_, byteCount);
decoder_->init(*stream_);
-
+ blockEnd_ = stream_->byteCount() + byteCount;
+
auto_ptr<InputStream> st = boundedInputStream(*stream_,
static_cast<size_t>(byteCount));
if (codec_ == NULL_CODEC) {
dataDecoder_->init(*st);
@@ -489,6 +492,73 @@ void DataFileReaderBase::readHeader()
}
avro::decode(*decoder_, sync_);
+ decoder_->init(*stream_);
+ blockStart_ = stream_->byteCount();
+}
+
+void DataFileReaderBase::seek(int64_t position) {
+ if (!eof_) {
+ dataDecoder_->init(*dataStream_);
+ drain(*dataStream_);
+ }
+ decoder_->init(*stream_);
+ stream_->seek(position);
+ eof_ = false;
+ readDataBlock();
+}
+
+void DataFileReaderBase::sync(int64_t position) {
+ if (!eof_) {
+ dataDecoder_->init(*dataStream_);
+ drain(*dataStream_);
+ }
+ decoder_->init(*stream_);
+ stream_->seek(position);
+ eof_ = false;
+ DataFileSync sync_buffer;
+ const uint8_t *p = 0;
+ size_t n = 0;
+ int i = 0;
+ while (i < DataFileSync::static_size) {
+ if (n == 0 && !stream_->next(&p, &n)) {
+ eof_ = true;
+ return;
+ }
+ int len =
+ std::min(static_cast<size_t>(DataFileSync::static_size - i), n);
+ memcpy(&sync_buffer[i], p, len);
+ p += len;
+ n -= len;
+ i += len;
+ }
+ for (;;) {
+ int j = 0;
+ for (; j < DataFileSync::static_size; ++j) {
+ if (sync_[j] != sync_buffer[(i + j) % DataFileSync::static_size]) {
+ break;
+ }
+ }
+ if (j == DataFileSync::static_size) {
+ // Found the sync marker!
+ break;
+ }
+ if (n == 0 && !stream_->next(&p, &n)) {
+ eof_ = true;
+ return;
+ }
+ sync_buffer[i++ % DataFileSync::static_size] = *p++;
+ --n;
+ }
+ stream_->backup(n);
+ readDataBlock();
+}
+
+bool DataFileReaderBase::pastSync(int64_t position) {
+ return !hasMore() || blockStart_ >= position + DataFileSync::static_size;
+}
+
+int64_t DataFileReaderBase::previousSync() {
+ return blockStart_;
}
} // namespace avro
diff --git a/lang/c++/impl/FileStream.cc b/lang/c++/impl/FileStream.cc
index 39c5af23c..63699682f 100644
--- a/lang/c++/impl/FileStream.cc
+++ b/lang/c++/impl/FileStream.cc
@@ -42,7 +42,7 @@ namespace avro {
namespace {
struct BufferCopyIn {
virtual ~BufferCopyIn() { }
- virtual void seek(size_t len) = 0;
+ virtual void seek(ssize_t len) = 0;
virtual bool read(uint8_t* b, size_t toRead, size_t& actual) = 0;
};
@@ -61,7 +61,7 @@ struct FileBufferCopyIn : public BufferCopyIn {
::CloseHandle(h_);
}
- void seek(size_t len) {
+ void seek(ssize_t len) {
if (::SetFilePointer(h_, len, NULL, FILE_CURRENT) !=
INVALID_SET_FILE_POINTER) {
throw Exception(boost::format("Cannot skip file: %1%") %
::GetLastError());
}
@@ -90,7 +90,7 @@ struct FileBufferCopyIn : public BufferCopyIn {
::close(fd_);
}
- void seek(size_t len) {
+ void seek(ssize_t len) {
off_t r = ::lseek(fd_, len, SEEK_CUR);
if (r == static_cast<off_t>(-1)) {
throw Exception(boost::format("Cannot skip file: %1%") %
@@ -116,7 +116,7 @@ struct IStreamBufferCopyIn : public BufferCopyIn {
IStreamBufferCopyIn(istream& is) : is_(is) {
}
- void seek(size_t len) {
+ void seek(ssize_t len) {
if (! is_.seekg(len, std::ios_base::cur)) {
throw Exception("Cannot skip stream");
}
@@ -135,7 +135,7 @@ struct IStreamBufferCopyIn : public BufferCopyIn {
}
-class BufferCopyInInputStream : public InputStream {
+class BufferCopyInInputStream : public SeekableInputStream {
const size_t bufferSize_;
uint8_t* const buffer_;
auto_ptr<BufferCopyIn> in_;
@@ -188,6 +188,13 @@ class BufferCopyInInputStream : public InputStream {
return false;
}
+ void seek(int64_t position) {
+ // BufferCopyIn::seek is relative to byteCount_, whereas position is
+ // absolute.
+ in_->seek(position - byteCount_ - available_);
+ byteCount_ = position;
+ available_ = 0;
+ }
public:
BufferCopyInInputStream(auto_ptr<BufferCopyIn>& in, size_t bufferSize) :
@@ -327,7 +334,16 @@ auto_ptr<InputStream> fileInputStream(const char* filename,
size_t bufferSize)
{
auto_ptr<BufferCopyIn> in(new FileBufferCopyIn(filename));
- return auto_ptr<InputStream>( new BufferCopyInInputStream(in, bufferSize));
+ return auto_ptr<InputStream>(
+ new BufferCopyInInputStream(in, bufferSize));
+}
+
+auto_ptr<SeekableInputStream> fileSeekableInputStream(const char* filename,
+ size_t bufferSize)
+{
+ auto_ptr<BufferCopyIn> in(new FileBufferCopyIn(filename));
+ return auto_ptr<SeekableInputStream>(
+ new BufferCopyInInputStream(in, bufferSize));
}
auto_ptr<InputStream> istreamInputStream(istream& is,
diff --git a/lang/c++/test/CodecTests.cc b/lang/c++/test/CodecTests.cc
index f8bbe84d0..064351b3e 100644
--- a/lang/c++/test/CodecTests.cc
+++ b/lang/c++/test/CodecTests.cc
@@ -1363,9 +1363,17 @@ static const TestData4 data4BinaryOnly[] = {
#define COUNTOF(x) sizeof(x) / sizeof(x[0])
#define ENDOF(x) (x) + COUNTOF(x)
-#define ADD_TESTS(testSuite, Factory, testFunc, data) \
-testSuite.add(BOOST_PARAM_TEST_CASE(&testFunc<Factory>, \
- data, data + COUNTOF(data)))
+// Boost 1.67 and later expects test cases to have unique names. This dummy
+// helper functions leads to names which compose 'testFunc', 'Factory', and
+// 'data'.
+template <typename Test, typename Data>
+Test testWithData(const Test &test, const Data &data) {
+ boost::ignore_unused(data);
+ return test;
+}
+#define ADD_TESTS(testSuite, Factory, testFunc, data) \
+ testSuite.add(BOOST_PARAM_TEST_CASE( \
+ testWithData(&testFunc<Factory>, data), data, data + COUNTOF(data)))
struct BinaryEncoderFactory {
static EncoderPtr newEncoder(const ValidSchema& schema) {
diff --git a/lang/c++/test/DataFileTests.cc b/lang/c++/test/DataFileTests.cc
index 27a7ce9ca..8d8c7b08e 100644
--- a/lang/c++/test/DataFileTests.cc
+++ b/lang/c++/test/DataFileTests.cc
@@ -16,6 +16,8 @@
* limitations under the License.
*/
+#include <boost/random/mersenne_twister.hpp>
+#include <boost/random/uniform_int_distribution.hpp>
#include <boost/test/included/unit_test_framework.hpp>
#include <boost/test/unit_test.hpp>
#include <boost/filesystem.hpp>
@@ -340,6 +342,97 @@ class DataFileTest {
BOOST_CHECK_EQUAL(i, count);
}
+ void testReaderSyncSeek() {
+ std::vector<int64_t> sync_points;
+ avro::DataFileReader<ComplexInteger> df(filename, writerSchema);
+ for (int64_t prev = 0; prev != df.previousSync(); df.sync(prev)) {
+ prev = df.previousSync();
+ sync_points.push_back(prev);
+ }
+ std::set<pair<int64_t, int64_t> > actual;
+ int num = 0;
+ for (int i = sync_points.size() - 2; i >= 0; --i) {
+ df.seek(sync_points[i]);
+ ComplexInteger ci;
+ // Subtract DataFileSync::static_size here because sync and
pastSync
+ // expect a point *at or before* the sync marker, whereas seek
+ // expects the point right *after* the sync marker.
+ while (!df.pastSync(sync_points[i + 1] -
+ avro::DataFileSync::static_size)) {
+ BOOST_CHECK(df.read(ci));
+ ++num;
+ actual.insert(std::make_pair(ci.re, ci.im));
+ }
+ }
+ df.close();
+ // We read 'count' total objects.
+ BOOST_CHECK_EQUAL(num, count);
+ // We read 'count' distinct objects.
+ BOOST_CHECK_EQUAL(actual.size(), count);
+ // They were the same objects initially written.
+ int64_t re = 3;
+ int64_t im = 5;
+ for (int i = 0; i < count; ++i, re *= im, im += 3) {
+ actual.insert(std::make_pair(re, im));
+ }
+ BOOST_CHECK_EQUAL(actual.size(), count);
+ }
+
+ void testReaderSyncDiscovery() {
+ std::set<int64_t> sync_points_syncing;
+ std::set<int64_t> sync_points_reading;
+ {
+ avro::DataFileReader<ComplexInteger> df(filename, writerSchema);
+ for (int64_t prev = 0; prev != df.previousSync(); df.sync(prev)) {
+ prev = df.previousSync();
+ sync_points_syncing.insert(prev);
+ }
+ df.close();
+ }
+ {
+ avro::DataFileReader<ComplexInteger> df(filename, writerSchema);
+ sync_points_reading.insert(df.previousSync());
+ ComplexInteger ci;
+ while (df.read(ci)) {
+ sync_points_reading.insert(df.previousSync());
+ }
+ sync_points_reading.insert(df.previousSync());
+ df.close();
+ }
+ BOOST_CHECK(sync_points_syncing == sync_points_reading);
+ // Just to make sure we're actually finding a reasonable number of
+ // splits.. rather than bugs like only find the first split.
+ BOOST_CHECK_GT(sync_points_syncing.size(), 10);
+ }
+
+ // This is a direct port of testSplits() from
+ // lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java.
+ void testReaderSplits() {
+ boost::mt19937 random(static_cast<uint32_t>(time(0)));
+ avro::DataFileReader<ComplexInteger> df(filename, writerSchema);
+ std::ifstream just_for_length(
+ filename, std::ifstream::ate | std::ifstream::binary);
+ int length = just_for_length.tellg();
+ int splits = 10;
+ int end = length; // end of split
+ int remaining = end; // bytes remaining
+ int actual_count = 0; // count of entries
+ while (remaining > 0) {
+ int start =
+ std::max(0, end - boost::random::uniform_int_distribution<>(
+ 0, 2 * length / splits)(random));
+ df.sync(start); // count entries in split
+ while (!df.pastSync(end)) {
+ ComplexInteger ci;
+ df.read(ci);
+ actual_count++;
+ }
+ remaining -= end - start;
+ end = start;
+ }
+ BOOST_CHECK_EQUAL(actual_count, count);
+ }
+
void testReadDouble() {
avro::DataFileReader<ComplexDouble> df(filename, writerSchema);
int i = 0;
@@ -492,46 +585,98 @@ void addReaderTests(test_suite* ts, const
shared_ptr<DataFileTest>& t)
ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testReaderGenericProjection,
t));
ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup, t));
-
}
test_suite*
-init_unit_test_suite( int argc, char* argv[] )
+init_unit_test_suite(int argc, char *argv[])
{
- test_suite* ts= BOOST_TEST_SUITE("DataFile tests");
- shared_ptr<DataFileTest> t1(new DataFileTest("test1.df", sch, isch));
- ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWrite, t1));
- addReaderTests(ts, t1);
-
- shared_ptr<DataFileTest> t2(new DataFileTest("test2.df", sch, isch));
- ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWriteGeneric, t2));
- addReaderTests(ts, t2);
-
- shared_ptr<DataFileTest> t3(new DataFileTest("test3.df", dsch, dblsch));
- ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWriteDouble, t3));
- ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testReadDouble, t3));
- ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testReadDoubleTwoStep, t3));
- ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testReadDoubleTwoStepProject,
- t3));
- ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup, t3));
-
- shared_ptr<DataFileTest> t4(new DataFileTest("test4.df", dsch, dblsch));
- ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testTruncate, t4));
- ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup, t4));
-
- shared_ptr<DataFileTest> t5(new DataFileTest("test5.df", sch, isch));
- ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWriteGenericByName, t5));
- addReaderTests(ts, t5);
-
- shared_ptr<DataFileTest> t6(new DataFileTest("test6.df", dsch, dblsch));
- ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testZip, t6));
- shared_ptr<DataFileTest> t8(new DataFileTest("test8.df", dsch, dblsch));
+ {
+ test_suite *ts = BOOST_TEST_SUITE("DataFile tests: test1.df");
+ shared_ptr<DataFileTest> t1(new DataFileTest("test1.df", sch, isch));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWrite, t1));
+ addReaderTests(ts, t1);
+ boost::unit_test::framework::master_test_suite().add(ts);
+ }
+ {
+ test_suite *ts = BOOST_TEST_SUITE("DataFile tests: test2.df");
+ shared_ptr<DataFileTest> t2(new DataFileTest("test2.df", sch, isch));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWriteGeneric, t2));
+ addReaderTests(ts, t2);
+ boost::unit_test::framework::master_test_suite().add(ts);
+ }
+ {
+ test_suite *ts = BOOST_TEST_SUITE("DataFile tests: test3.df");
+ shared_ptr<DataFileTest> t3(new DataFileTest("test3.df", dsch,
dblsch));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWriteDouble, t3));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testReadDouble, t3));
+ ts->add(
+ BOOST_CLASS_TEST_CASE(&DataFileTest::testReadDoubleTwoStep, t3));
+ ts->add(BOOST_CLASS_TEST_CASE(
+ &DataFileTest::testReadDoubleTwoStepProject, t3));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup, t3));
+ boost::unit_test::framework::master_test_suite().add(ts);
+ }
+ {
+ test_suite *ts = BOOST_TEST_SUITE("DataFile tests: test4.df");
+ shared_ptr<DataFileTest> t4(new DataFileTest("test4.df", dsch,
dblsch));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testTruncate, t4));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup, t4));
+ boost::unit_test::framework::master_test_suite().add(ts);
+ }
+ {
+ test_suite *ts = BOOST_TEST_SUITE("DataFile tests: test5.df");
+ shared_ptr<DataFileTest> t5(new DataFileTest("test5.df", sch, isch));
+ ts->add(
+ BOOST_CLASS_TEST_CASE(&DataFileTest::testWriteGenericByName, t5));
+ addReaderTests(ts, t5);
+ boost::unit_test::framework::master_test_suite().add(ts);
+ }
+ {
+ test_suite *ts = BOOST_TEST_SUITE("DataFile tests: test6.df");
+ shared_ptr<DataFileTest> t6(new DataFileTest("test6.df", dsch,
dblsch));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testZip, t6));
+ boost::unit_test::framework::master_test_suite().add(ts);
+ }
+ {
+ test_suite *ts = BOOST_TEST_SUITE("DataFile tests: test8.df");
+ shared_ptr<DataFileTest> t8(new DataFileTest("test8.df", dsch,
dblsch));
#ifdef SNAPPY_CODEC_AVAILABLE
- ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testSnappy, t8));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testSnappy, t8));
#endif
- shared_ptr<DataFileTest> t7(new DataFileTest("test7.df",fsch,fsch));
- ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testSchemaReadWrite,t7));
- ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup,t7));
+ boost::unit_test::framework::master_test_suite().add(ts);
+ }
+ {
+ test_suite *ts = BOOST_TEST_SUITE("DataFile tests: test7.df");
+ shared_ptr<DataFileTest> t7(new DataFileTest("test7.df", fsch, fsch));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testSchemaReadWrite, t7));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup, t7));
+ boost::unit_test::framework::master_test_suite().add(ts);
+ }
+ {
+ test_suite *ts = BOOST_TEST_SUITE("DataFile tests: test9.df");
+ shared_ptr<DataFileTest> t9(new DataFileTest("test9.df", sch, sch));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWrite, t9));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testReaderSyncSeek, t9));
+ //ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup, t9));
+ boost::unit_test::framework::master_test_suite().add(ts);
+ }
+ {
+ test_suite *ts = BOOST_TEST_SUITE("DataFile tests: test10.df");
+ shared_ptr<DataFileTest> t(new DataFileTest("test10.df", sch, sch));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWrite, t));
+ ts->add(
+ BOOST_CLASS_TEST_CASE(&DataFileTest::testReaderSyncDiscovery, t));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup, t));
+ boost::unit_test::framework::master_test_suite().add(ts);
+ }
+ {
+ test_suite *ts = BOOST_TEST_SUITE("DataFile tests: test11.df");
+ shared_ptr<DataFileTest> t(new DataFileTest("test11.df", sch, sch));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWrite, t));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testReaderSplits, t));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup, t));
+ boost::unit_test::framework::master_test_suite().add(ts);
+ }
- return ts;
+ return 0;
}
diff --git a/lang/c++/test/unittest.cc b/lang/c++/test/unittest.cc
index 5aef9b75a..e12e6954e 100644
--- a/lang/c++/test/unittest.cc
+++ b/lang/c++/test/unittest.cc
@@ -767,14 +767,6 @@ struct TestResolution
ValidSchema unionTwo_;
};
-
-template<typename T>
-void addTestCase(boost::unit_test::test_suite &test)
-{
- boost::shared_ptr<T> newtest( new T );
- test.add( BOOST_CLASS_TEST_CASE( &T::test, newtest ));
-}
-
boost::unit_test::test_suite*
init_unit_test_suite( int argc, char* argv[] )
{
@@ -782,12 +774,18 @@ init_unit_test_suite( int argc, char* argv[] )
test_suite* test= BOOST_TEST_SUITE( "Avro C++ unit test suite" );
- addTestCase<TestEncoding>(*test);
- addTestCase<TestSchema>(*test);
- addTestCase<TestNested>(*test);
- addTestCase<TestGenerated>(*test);
- addTestCase<TestBadStuff>(*test);
- addTestCase<TestResolution>(*test);
+ test->add(BOOST_CLASS_TEST_CASE(&TestEncoding::test,
+ boost::make_shared<TestEncoding>()));
+ test->add(BOOST_CLASS_TEST_CASE(&TestSchema::test,
+ boost::make_shared<TestSchema>()));
+ test->add(BOOST_CLASS_TEST_CASE(&TestNested::test,
+ boost::make_shared<TestNested>()));
+ test->add(BOOST_CLASS_TEST_CASE(&TestGenerated::test,
+ boost::make_shared<TestGenerated>()));
+ test->add(BOOST_CLASS_TEST_CASE(&TestBadStuff::test,
+ boost::make_shared<TestBadStuff>()));
+ test->add(BOOST_CLASS_TEST_CASE(&TestResolution::test,
+ boost::make_shared<TestResolution>()));
return test;
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Support sync and seek in C++ DataFileReader
> -------------------------------------------
>
> Key: AVRO-2214
> URL: https://issues.apache.org/jira/browse/AVRO-2214
> Project: Avro
> Issue Type: Improvement
> Components: c++
> Affects Versions: 1.8.2
> Reporter: William Matthews
> Priority: Minor
>
> Java DataFileReader supports sync, seek, pastSync, etc. which allow parallel
> reads of files, and reasonably efficient "tailing" of files. It would be
> great if these were supported in C++ too.
> Also, I think this would serve as a bit of a workaround for
> https://issues.apache.org/jira/browse/AVRO-2178 (stat a file & see if it has
> grown, sync/seek, read, repeat).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)